Skip to content

Commit 9f63abb

Browse files
committed
Add routing
1 parent ba3a3ee commit 9f63abb

File tree

4 files changed

+83
-34
lines changed

4 files changed

+83
-34
lines changed

sentry_streams/src/consumer.rs

+43-16
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
use crate::kafka_config::PyKafkaConsumerConfig;
99
use crate::operators::build;
1010
use crate::operators::RuntimeOperator;
11+
use crate::routes::Route;
12+
use crate::routes::RoutedValue;
1113
use pyo3::prelude::*;
1214
use pyo3::types::PyBytes;
1315
use sentry_arroyo::backends::kafka::types::KafkaPayload;
@@ -39,6 +41,8 @@ pub struct ArroyoConsumer {
3941

4042
topic: String,
4143

44+
source: String,
45+
4246
steps: Vec<Py<RuntimeOperator>>,
4347

4448
/// The ProcessorHandle allows the main thread to stop the StreamingProcessor
@@ -49,10 +53,11 @@ pub struct ArroyoConsumer {
4953
#[pymethods]
5054
impl ArroyoConsumer {
5155
#[new]
52-
fn new(kafka_config: PyKafkaConsumerConfig, topic: String) -> Self {
56+
fn new(source: String, kafka_config: PyKafkaConsumerConfig, topic: String) -> Self {
5357
ArroyoConsumer {
5458
consumer_config: kafka_config,
5559
topic,
60+
source,
5661
steps: Vec::new(),
5762
handle: None,
5863
}
@@ -77,7 +82,10 @@ impl ArroyoConsumer {
7782
.map(|step| step.clone_ref(py))
7883
.collect::<Vec<_>>()
7984
});
80-
let factory = ArroyoStreamingFactory { steps: steps_copy };
85+
let factory = ArroyoStreamingFactory {
86+
source: self.source.clone(),
87+
steps: steps_copy,
88+
};
8189
let config = self.consumer_config.clone().into();
8290
let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), None);
8391
self.handle = Some(processor.get_handle());
@@ -89,7 +97,7 @@ impl ArroyoConsumer {
8997
})
9098
.expect("Error setting Ctrl+C handler");
9199

92-
processor.run();
100+
let _ = processor.run();
93101
}
94102

95103
fn shutdown(&mut self) {
@@ -103,40 +111,45 @@ impl ArroyoConsumer {
103111
/// Converts a Message<KafkaPayload> to a Message<Py<PyAny>>.
104112
/// It takes the Kafka payload as bytes and turns it into a
105113
/// Python bytes object.
106-
fn to_python(message: Message<KafkaPayload>) -> Message<Py<PyAny>> {
107-
let payload = Python::with_gil(|py| {
108-
let payload = message.payload().payload().unwrap();
109-
let py_bytes = PyBytes::new(py, payload);
110-
py_bytes.into_any().unbind()
114+
fn to_python(source: &str, message: Message<KafkaPayload>) -> Message<RoutedValue> {
115+
let payload = Python::with_gil(|py| match message.payload().payload() {
116+
Some(payload) => PyBytes::new(py, payload).into_any().unbind(),
117+
None => py.None(),
111118
});
112-
message.replace(payload)
119+
let route = Route::new(source.to_string(), vec![]);
120+
message.replace(RoutedValue { route, payload })
113121
}
114122

115123
/// Builds the Arroyo StreamProcessor for this consumer.
116124
/// It plugs a Commit policy at the end and a translator at the beginning
117125
/// that takes the payload of the Kafka message and turns it into a Py<PyAny>
118-
fn build_chain(steps: &[Py<RuntimeOperator>]) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
119-
let mut next: Box<dyn ProcessingStrategy<Py<PyAny>>> =
126+
fn build_chain(
127+
source: &str,
128+
steps: &[Py<RuntimeOperator>],
129+
) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
130+
let mut next: Box<dyn ProcessingStrategy<RoutedValue>> =
120131
Box::new(CommitOffsets::new(Duration::from_secs(5)));
121132
for step in steps.iter().rev() {
122133
next = build(step, next);
123134
}
124135

125-
let converter = RunTask::new(
126-
|message: Message<KafkaPayload>| Ok(to_python(message)),
127-
next,
128-
);
136+
let copied_source = source.to_string();
137+
let conversion_function =
138+
move |message: Message<KafkaPayload>| Ok(to_python(&copied_source, message));
139+
140+
let converter = RunTask::new(conversion_function, next);
129141

130142
Box::new(converter)
131143
}
132144

133145
struct ArroyoStreamingFactory {
146+
source: String,
134147
steps: Vec<Py<RuntimeOperator>>,
135148
}
136149

137150
impl ProcessingStrategyFactory<KafkaPayload> for ArroyoStreamingFactory {
138151
fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
139-
build_chain(&self.steps)
152+
build_chain(&self.source, &self.steps)
140153
}
141154
}
142155
#[cfg(test)]
@@ -167,6 +180,20 @@ mod tests {
167180
});
168181
}
169182

183+
#[test]
184+
fn test_to_none_python() {
185+
Python::with_gil(|py| {
186+
let message =
187+
Message::new_any_message(KafkaPayload::new(None, None, None), BTreeMap::new());
188+
189+
let python_message = to_python(message);
190+
191+
let py_payload = python_message.payload();
192+
193+
assert!(py_payload.is_none(py));
194+
});
195+
}
196+
170197
#[test]
171198
fn test_build_chain() {
172199
Python::with_gil(|py| {

sentry_streams/src/fake_strategy.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::*;
2+
use crate::routes::RoutedValue;
23
use sentry_arroyo::processing::strategies::{
34
CommitRequest, ProcessingStrategy, StrategyError, SubmitError,
45
};
@@ -10,13 +11,16 @@ pub struct FakeStrategy {
1011
pub submitted: Arc<Mutex<Vec<Py<PyAny>>>>,
1112
}
1213

13-
impl ProcessingStrategy<Py<PyAny>> for FakeStrategy {
14+
impl ProcessingStrategy<RoutedValue> for FakeStrategy {
1415
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
1516
Ok(None)
1617
}
1718

18-
fn submit(&mut self, message: Message<Py<PyAny>>) -> Result<(), SubmitError<Py<PyAny>>> {
19-
self.submitted.lock().unwrap().push(message.into_payload());
19+
fn submit(&mut self, message: Message<RoutedValue>) -> Result<(), SubmitError<RoutedValue>> {
20+
self.submitted
21+
.lock()
22+
.unwrap()
23+
.push(message.into_payload().payload);
2024
Ok(())
2125
}
2226

sentry_streams/src/operators.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::kafka_config::PyKafkaConsumerConfig;
2-
use crate::routes::Route;
2+
use crate::routes::{Route, RoutedValue};
33
use crate::transformer::build_map;
44
use pyo3::prelude::*;
55
use sentry_arroyo::processing::strategies::ProcessingStrategy;
@@ -77,8 +77,8 @@ impl RuntimeOperator {
7777

7878
pub fn build(
7979
step: &Py<RuntimeOperator>,
80-
next: Box<dyn ProcessingStrategy<Py<PyAny>>>,
81-
) -> Box<dyn ProcessingStrategy<Py<PyAny>>> {
80+
next: Box<dyn ProcessingStrategy<RoutedValue>>,
81+
) -> Box<dyn ProcessingStrategy<RoutedValue>> {
8282
match step.get() {
8383
RuntimeOperator::Map { function, .. } => {
8484
let func_ref = Python::with_gil(|py| function.clone_ref(py));

sentry_streams/src/transformer.rs

+30-12
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1+
use crate::routes::RoutedValue;
12
use pyo3::prelude::*;
23
use sentry_arroyo::processing::strategies::run_task::RunTask;
34
use sentry_arroyo::processing::strategies::ProcessingStrategy;
45
use sentry_arroyo::types::Message;
56

67
/// Executes a Python callable with an Arroyo message containing Any and
78
/// returns the result.
8-
fn call_python_function(callable: &Py<PyAny>, message: &Message<Py<PyAny>>) -> Py<PyAny> {
9+
fn call_python_function(
10+
callable: &Py<PyAny>,
11+
message: &Message<RoutedValue>,
12+
) -> Result<Py<PyAny>, PyErr> {
913
Python::with_gil(|py| {
10-
let result = callable.call1(py, (message.payload(),)).unwrap();
11-
result
14+
let python_payload = message.payload().payload.clone_ref(py);
15+
callable.call1(py, (python_payload,))
1216
})
1317
}
1418

@@ -20,11 +24,17 @@ fn call_python_function(callable: &Py<PyAny>, message: &Message<Py<PyAny>>) -> P
2024
/// This function takes a `next` step to wire the Arroyo strategy to.
2125
pub fn build_map(
2226
callable: Py<PyAny>,
23-
next: Box<dyn ProcessingStrategy<Py<PyAny>>>,
24-
) -> Box<dyn ProcessingStrategy<Py<PyAny>>> {
25-
let mapper = move |message| {
27+
next: Box<dyn ProcessingStrategy<RoutedValue>>,
28+
) -> Box<dyn ProcessingStrategy<RoutedValue>> {
29+
let mapper = move |message: Message<RoutedValue>| {
30+
let route = message.payload().route.clone();
2631
let transformed = call_python_function(&callable, &message);
27-
Ok(message.replace(transformed))
32+
// TODO: Create an exception for Invalid messages in Python
33+
// This now crashes if the Python code fails.
34+
Ok(message.replace(RoutedValue {
35+
route,
36+
payload: transformed.unwrap(),
37+
}))
2838
};
2939
Box::new(RunTask::new(mapper, next))
3040
}
@@ -33,13 +43,23 @@ mod tests {
3343
use super::*;
3444
use crate::fake_strategy::assert_messages_match;
3545
use crate::fake_strategy::FakeStrategy;
46+
use crate::routes::Route;
3647
use pyo3::ffi::c_str;
3748
use pyo3::IntoPyObjectExt;
3849
use sentry_arroyo::processing::strategies::ProcessingStrategy;
3950
use std::collections::BTreeMap;
4051
use std::ops::Deref;
4152
use std::sync::{Arc, Mutex};
4253

54+
fn build_routed_value(py: Python<'_>) -> RoutedValue {
55+
let msg_payload = "test_message".into_py_any(py).unwrap();
56+
let route = Route::new("source".to_string(), vec!["waypoint".to_string()]);
57+
RoutedValue {
58+
route,
59+
payload: msg_payload,
60+
}
61+
}
62+
4363
#[test]
4464
fn test_build_map() {
4565
Python::with_gil(|py| {
@@ -56,8 +76,7 @@ mod tests {
5676

5777
let mut strategy = build_map(callable, Box::new(next_step));
5878

59-
let message =
60-
Message::new_any_message("test_message".into_py_any(py).unwrap(), BTreeMap::new());
79+
let message = Message::new_any_message(build_routed_value(py), BTreeMap::new());
6180
let result = strategy.submit(message);
6281

6382
assert!(result.is_ok());
@@ -79,10 +98,9 @@ mod tests {
7998
.into_py_any(py)
8099
.unwrap();
81100

82-
let message =
83-
Message::new_any_message("test_message".into_py_any(py).unwrap(), BTreeMap::new());
101+
let message = Message::new_any_message(build_routed_value(py), BTreeMap::new());
84102

85-
let result = call_python_function(&callable, &message);
103+
let result = call_python_function(&callable, &message).unwrap();
86104

87105
assert_eq!(
88106
result.extract::<String>(py).unwrap(),

0 commit comments

Comments
 (0)