Skip to content

Commit bd59e6f

Browse files
committed
Add producer
1 parent 10e66f7 commit bd59e6f

File tree

9 files changed

+320
-23
lines changed

9 files changed

+320
-23
lines changed

sentry_streams/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sentry_streams/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ crate-type = ["cdylib"]
2020
[features]
2121
extension-module = ["pyo3/extension-module"]
2222
default = ["extension-module"]
23+
24+
[dev-dependencies]
25+
parking_lot = "0.12.1"

sentry_streams/src/consumer.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use pyo3::prelude::*;
1414
use pyo3::types::PyBytes;
1515
use sentry_arroyo::backends::kafka::types::KafkaPayload;
1616
use sentry_arroyo::processing::strategies::commit_offsets::CommitOffsets;
17+
use sentry_arroyo::processing::strategies::noop::Noop;
1718
use sentry_arroyo::processing::strategies::run_task::RunTask;
1819
use sentry_arroyo::processing::strategies::ProcessingStrategy;
1920
use sentry_arroyo::processing::strategies::ProcessingStrategyFactory;
@@ -131,7 +132,11 @@ fn build_chain(
131132
) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
132133
let mut next = ending_strategy;
133134
for step in steps.iter().rev() {
134-
next = build(step, next);
135+
next = build(
136+
step,
137+
next,
138+
Box::new(CommitOffsets::new(Duration::from_secs(5))),
139+
);
135140
}
136141

137142
let copied_source = source.to_string();
@@ -166,11 +171,7 @@ impl ArroyoStreamingFactory {
166171

167172
impl ProcessingStrategyFactory<KafkaPayload> for ArroyoStreamingFactory {
168173
fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
169-
build_chain(
170-
&self.source,
171-
&self.steps,
172-
Box::new(CommitOffsets::new(Duration::from_secs(5))),
173-
)
174+
build_chain(&self.source, &self.steps, Box::new(Noop {}))
174175
}
175176
}
176177
#[cfg(test)]

sentry_streams/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod consumer;
33
mod kafka_config;
44
mod operators;
55
mod routes;
6+
mod sinks;
67
mod transformer;
78

89
#[cfg(test)]

sentry_streams/src/operators.rs

+19-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1-
use crate::kafka_config::PyKafkaConsumerConfig;
1+
use crate::kafka_config::PyKafkaProducerConfig;
22
use crate::routes::{Route, RoutedValue};
3+
use crate::sinks::StreamSink;
34
use crate::transformer::build_map;
45
use pyo3::prelude::*;
6+
use sentry_arroyo::backends::kafka::producer::KafkaProducer;
7+
use sentry_arroyo::backends::kafka::types::KafkaPayload;
58
use sentry_arroyo::processing::strategies::ProcessingStrategy;
69

710
/// RuntimeOperator represent a translated step in the streaming pipeline the
@@ -30,7 +33,7 @@ pub enum RuntimeOperator {
3033
StreamSink {
3134
route: Route,
3235
topic_name: String,
33-
kafka_config: PyKafkaConsumerConfig,
36+
kafka_config: PyKafkaProducerConfig,
3437
},
3538
}
3639

@@ -78,15 +81,26 @@ impl RuntimeOperator {
7881
pub fn build(
7982
step: &Py<RuntimeOperator>,
8083
next: Box<dyn ProcessingStrategy<RoutedValue>>,
84+
terminator_strategy: Box<dyn ProcessingStrategy<KafkaPayload>>,
8185
) -> Box<dyn ProcessingStrategy<RoutedValue>> {
8286
match step.get() {
8387
RuntimeOperator::Map { function, route } => {
8488
let func_ref = Python::with_gil(|py| function.clone_ref(py));
8589
build_map(route, func_ref, next)
8690
}
87-
RuntimeOperator::StreamSink { .. } => {
88-
// Handle StreamSink step
89-
unimplemented!()
91+
RuntimeOperator::StreamSink {
92+
route,
93+
topic_name,
94+
kafka_config,
95+
} => {
96+
let producer = KafkaProducer::new(kafka_config.clone().into());
97+
Box::new(StreamSink::new(
98+
route.clone(),
99+
producer,
100+
topic_name,
101+
next,
102+
terminator_strategy,
103+
))
90104
}
91105
}
92106
}

sentry_streams/src/routes.rs

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ impl Route {
4747
/// Represents a message being passed between steps in the Arroyo
4848
/// consumer. All messages have a Route attached to them which
4949
/// represents the path taken by the message in the pipeline.
50+
#[derive(Debug)]
5051
pub struct RoutedValue {
5152
pub route: Route,
5253
pub payload: Py<PyAny>, // Replace Py<PyAny> with the concrete type you need

sentry_streams/src/sinks.rs

+242
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
//! This module provides an implementation of the StreamSink pipeline
2+
//! step that produces messages on a Kafka topic.
3+
//!
4+
//! As all the strategies in the Arroyo streaming pipeline adapter,
5+
//! This checks whether a message should be processed or forwarded
6+
//! via the `Route` attribute.
7+
8+
use crate::routes::{Route, RoutedValue};
9+
use pyo3::prelude::*;
10+
use pyo3::types::PyAnyMethods;
11+
use pyo3::types::PyBytes;
12+
use sentry_arroyo::backends::kafka::types::KafkaPayload;
13+
use sentry_arroyo::backends::Producer;
14+
use sentry_arroyo::processing::strategies::MessageRejected;
15+
use sentry_arroyo::types::{Message, Topic, TopicOrPartition};
16+
17+
use sentry_arroyo::processing::strategies::produce::Produce;
18+
use sentry_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig;
19+
use sentry_arroyo::processing::strategies::ProcessingStrategy;
20+
use sentry_arroyo::processing::strategies::SubmitError;
21+
use sentry_arroyo::processing::strategies::{merge_commit_request, CommitRequest, StrategyError};
22+
use std::time::Duration;
23+
24+
/// Turns a `Message<RoutedValue` passed among streaming primitives into
25+
/// a `Message<KafkaPayload>` to be produced on a Kafka topic.
26+
/// This is needed because what we need to produce on a Kafka topic is
27+
/// only the payload in bytes taken from the `Message<RoutedValue>`.
28+
/// Also this has to be converted from the Python PyAny payload into
29+
/// a Vector of bytes.
30+
fn to_kafka_payload(message: Message<RoutedValue>) -> Message<KafkaPayload> {
31+
// Convert the RoutedValue to KafkaPayload
32+
// This is a placeholder implementation
33+
let payload = Python::with_gil(|py| {
34+
let payload = &message.payload().payload;
35+
if payload.is_none(py) {
36+
return KafkaPayload::new(None, None, None);
37+
}
38+
39+
let py_bytes: &Bound<PyBytes> = payload.bind(py).downcast().unwrap();
40+
let raw_bytes = py_bytes.as_bytes();
41+
KafkaPayload::new(None, None, Some(raw_bytes.to_vec()))
42+
});
43+
message.replace(payload)
44+
}
45+
46+
/// Implements the StreamSink logic.
47+
///
48+
/// This is an Arroyo strategy that contains two next steps. One
49+
/// is the Kafka producer that turns messages into KafkaPayload and
50+
/// send them to Kafka. The second is the following step in the
51+
/// pipeline that receives all messages that have a `Route` that does
52+
/// not correspond to the one this producer is meant to process.
53+
///
54+
/// The producer is an Arroyo `Produce` strategy, which contains a
55+
/// `Producer` struct and forwards messages to a `CommitOffsets` step
56+
/// TODO: This strategy does not guarantee at least once delivery as
57+
/// it does not wait for the following step to return an offset before
58+
/// committing. It commits everything the Producer produces successfully.
59+
pub struct StreamSink {
60+
/// The route this strategy processes. Every message not for this
61+
/// strategy is sent to the `next` strategy without being processed.
62+
route: Route,
63+
produce_strategy: Produce<Box<dyn ProcessingStrategy<KafkaPayload>>>,
64+
next_strategy: Box<dyn ProcessingStrategy<RoutedValue>>,
65+
66+
/// Keeps track of the last message this strategy did not manage to
67+
/// forward. This is a hack like in `RunTask` to be able to return
68+
/// the failed message to the previous strategy after it has been
69+
/// passed to the transformer function and been mutated.
70+
message_carried_over: Option<Message<KafkaPayload>>,
71+
commit_request_carried_over: Option<CommitRequest>,
72+
}
73+
74+
impl StreamSink {
75+
pub fn new(
76+
route: Route,
77+
producer: impl Producer<KafkaPayload> + 'static,
78+
topic: &str,
79+
next_strategy: Box<dyn ProcessingStrategy<RoutedValue>>,
80+
terminator_strategy: Box<dyn ProcessingStrategy<KafkaPayload>>,
81+
) -> Self {
82+
let produce_strategy = Produce::new(
83+
terminator_strategy,
84+
producer,
85+
&ConcurrencyConfig::new(1),
86+
TopicOrPartition::from(Topic::new(topic)),
87+
);
88+
89+
StreamSink {
90+
route,
91+
produce_strategy,
92+
next_strategy,
93+
message_carried_over: None,
94+
commit_request_carried_over: None,
95+
}
96+
}
97+
}
98+
99+
impl ProcessingStrategy<RoutedValue> for StreamSink {
100+
/// Polls on both the strategies downstream.
101+
/// It simply merges the commit requests coming back from them and
102+
/// returns the result.
103+
/// TODO: In order to achieve at least once delivery we should
104+
/// always only return the lowest committed offset.
105+
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
106+
let next_commit_request = self.next_strategy.poll()?;
107+
self.commit_request_carried_over =
108+
merge_commit_request(self.commit_request_carried_over.take(), next_commit_request);
109+
110+
let produce_commit_request = self.produce_strategy.poll()?;
111+
self.commit_request_carried_over = merge_commit_request(
112+
self.commit_request_carried_over.take(),
113+
produce_commit_request,
114+
);
115+
116+
if let Some(message) = self.message_carried_over.take() {
117+
match self.produce_strategy.submit(message) {
118+
Err(SubmitError::MessageRejected(MessageRejected {
119+
message: transformed_message,
120+
})) => {
121+
self.message_carried_over = Some(transformed_message);
122+
}
123+
Err(SubmitError::InvalidMessage(invalid_message)) => {
124+
return Err(invalid_message.into());
125+
}
126+
Ok(_) => {}
127+
}
128+
}
129+
130+
Ok(self.commit_request_carried_over.take())
131+
}
132+
133+
/// Submit the message to the producer if the route in the message
134+
/// corresponds to the route in the message correspond to the Route
135+
/// attribute in this strategy.
136+
fn submit(&mut self, message: Message<RoutedValue>) -> Result<(), SubmitError<RoutedValue>> {
137+
if self.message_carried_over.is_some() {
138+
return Err(SubmitError::MessageRejected(MessageRejected { message }));
139+
}
140+
141+
if self.route != message.payload().route {
142+
self.next_strategy.submit(message)
143+
} else {
144+
match self.produce_strategy.submit(to_kafka_payload(message)) {
145+
Err(SubmitError::MessageRejected(MessageRejected {
146+
message: transformed_message,
147+
})) => {
148+
self.message_carried_over = Some(transformed_message);
149+
}
150+
Err(SubmitError::InvalidMessage(invalid_message)) => {
151+
return Err(SubmitError::InvalidMessage(invalid_message));
152+
}
153+
Ok(_) => {}
154+
}
155+
Ok(())
156+
}
157+
}
158+
159+
fn terminate(&mut self) {
160+
self.produce_strategy.terminate();
161+
self.next_strategy.terminate();
162+
}
163+
164+
fn join(&mut self, timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
165+
let next_commit = self.next_strategy.join(timeout)?;
166+
let next_commit_produce = self.produce_strategy.join(timeout)?;
167+
Ok(merge_commit_request(
168+
merge_commit_request(self.commit_request_carried_over.take(), next_commit),
169+
next_commit_produce,
170+
))
171+
}
172+
}
173+
174+
#[cfg(test)]
175+
mod tests {
176+
use super::*;
177+
use crate::routes::Route;
178+
use crate::test_operators::make_routed_msg;
179+
use chrono::Utc;
180+
use parking_lot::Mutex;
181+
use sentry_arroyo::backends::local::broker::LocalBroker;
182+
use sentry_arroyo::backends::local::LocalProducer;
183+
use sentry_arroyo::backends::storages::memory::MemoryMessageStorage;
184+
use sentry_arroyo::processing::strategies::noop::Noop;
185+
use sentry_arroyo::utils::clock::TestingClock;
186+
use std::sync::Arc;
187+
use std::time::SystemTime;
188+
189+
#[test]
190+
fn test_kafka_payload() {
191+
Python::with_gil(|py| {
192+
let message = make_routed_msg(
193+
py,
194+
PyBytes::new(py, b"test_message").into_any().unbind(),
195+
"source",
196+
vec!["waypoint1".to_string()],
197+
);
198+
let kafka_payload = to_kafka_payload(message);
199+
let py_payload = kafka_payload.payload();
200+
201+
let kafka_payload = py_payload.payload();
202+
assert!(kafka_payload.is_some());
203+
assert_eq!(kafka_payload.unwrap(), b"test_message");
204+
});
205+
}
206+
207+
#[test]
208+
fn test_other_route() {
209+
let result_topic = Topic::new("result-topic");
210+
let clock = TestingClock::new(SystemTime::now());
211+
let storage = MemoryMessageStorage::default();
212+
let mut broker = LocalBroker::new(Box::new(storage), Box::new(clock));
213+
broker.create_topic(result_topic, 1).unwrap();
214+
215+
let broker = Arc::new(Mutex::new(broker));
216+
let producer = LocalProducer::new(broker.clone());
217+
218+
let next_step = Noop {};
219+
let terminator = Noop {};
220+
221+
let mut sink = StreamSink::new(
222+
Route::new("source".to_string(), vec!["wp1".to_string()]),
223+
producer,
224+
"result-topic",
225+
Box::new(next_step),
226+
Box::new(terminator),
227+
);
228+
229+
Python::with_gil(|py| {
230+
let value = br#"{"something": "something"}"#.to_vec();
231+
let message = make_routed_msg(
232+
py,
233+
PyBytes::new(py, &value).into_any().unbind(),
234+
"source",
235+
vec![],
236+
);
237+
sink.submit(message).unwrap();
238+
239+
sink.join(None).unwrap();
240+
});
241+
}
242+
}

sentry_streams/src/test_operators.rs

+27
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use crate::routes::Route;
2+
use crate::routes::RoutedValue;
13
use pyo3::prelude::*;
24
use pyo3::IntoPyObjectExt;
35
use sentry_arroyo::backends::kafka::types::KafkaPayload;
@@ -19,3 +21,28 @@ pub fn make_msg(payload: Option<Vec<u8>>) -> Message<KafkaPayload> {
1921
std::collections::BTreeMap::new(),
2022
)
2123
}
24+
25+
#[cfg(test)]
26+
pub fn build_routed_value(
27+
py: Python<'_>,
28+
msg_payload: Py<PyAny>,
29+
source: &str,
30+
waypoints: Vec<String>,
31+
) -> RoutedValue {
32+
let route = Route::new(source.to_string(), waypoints);
33+
RoutedValue {
34+
route,
35+
payload: msg_payload,
36+
}
37+
}
38+
39+
#[cfg(test)]
40+
pub fn make_routed_msg(
41+
py: Python<'_>,
42+
msg_payload: Py<PyAny>,
43+
source: &str,
44+
waypoints: Vec<String>,
45+
) -> Message<RoutedValue> {
46+
let routed_value = build_routed_value(py, msg_payload, source, waypoints);
47+
Message::new_any_message(routed_value, std::collections::BTreeMap::new())
48+
}

0 commit comments

Comments
 (0)