Skip to content

Commit 7dadcf8

Browse files
committed
Add producer
1 parent 10e66f7 commit 7dadcf8

File tree

9 files changed

+346
-23
lines changed

9 files changed

+346
-23
lines changed

sentry_streams/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sentry_streams/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ crate-type = ["cdylib"]
2020
[features]
2121
extension-module = ["pyo3/extension-module"]
2222
default = ["extension-module"]
23+
24+
[dev-dependencies]
25+
parking_lot = "0.12.1"

sentry_streams/src/consumer.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use pyo3::prelude::*;
1414
use pyo3::types::PyBytes;
1515
use sentry_arroyo::backends::kafka::types::KafkaPayload;
1616
use sentry_arroyo::processing::strategies::commit_offsets::CommitOffsets;
17+
use sentry_arroyo::processing::strategies::noop::Noop;
1718
use sentry_arroyo::processing::strategies::run_task::RunTask;
1819
use sentry_arroyo::processing::strategies::ProcessingStrategy;
1920
use sentry_arroyo::processing::strategies::ProcessingStrategyFactory;
@@ -131,7 +132,11 @@ fn build_chain(
131132
) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
132133
let mut next = ending_strategy;
133134
for step in steps.iter().rev() {
134-
next = build(step, next);
135+
next = build(
136+
step,
137+
next,
138+
Box::new(CommitOffsets::new(Duration::from_secs(5))),
139+
);
135140
}
136141

137142
let copied_source = source.to_string();
@@ -166,11 +171,7 @@ impl ArroyoStreamingFactory {
166171

167172
impl ProcessingStrategyFactory<KafkaPayload> for ArroyoStreamingFactory {
168173
fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
169-
build_chain(
170-
&self.source,
171-
&self.steps,
172-
Box::new(CommitOffsets::new(Duration::from_secs(5))),
173-
)
174+
build_chain(&self.source, &self.steps, Box::new(Noop {}))
174175
}
175176
}
176177
#[cfg(test)]

sentry_streams/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod consumer;
33
mod kafka_config;
44
mod operators;
55
mod routes;
6+
mod sinks;
67
mod transformer;
78

89
#[cfg(test)]

sentry_streams/src/operators.rs

+19-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1-
use crate::kafka_config::PyKafkaConsumerConfig;
1+
use crate::kafka_config::PyKafkaProducerConfig;
22
use crate::routes::{Route, RoutedValue};
3+
use crate::sinks::StreamSink;
34
use crate::transformer::build_map;
45
use pyo3::prelude::*;
6+
use sentry_arroyo::backends::kafka::producer::KafkaProducer;
7+
use sentry_arroyo::backends::kafka::types::KafkaPayload;
58
use sentry_arroyo::processing::strategies::ProcessingStrategy;
69

710
/// RuntimeOperator represent a translated step in the streaming pipeline the
@@ -30,7 +33,7 @@ pub enum RuntimeOperator {
3033
StreamSink {
3134
route: Route,
3235
topic_name: String,
33-
kafka_config: PyKafkaConsumerConfig,
36+
kafka_config: PyKafkaProducerConfig,
3437
},
3538
}
3639

@@ -78,15 +81,26 @@ impl RuntimeOperator {
7881
pub fn build(
7982
step: &Py<RuntimeOperator>,
8083
next: Box<dyn ProcessingStrategy<RoutedValue>>,
84+
terminator_strategy: Box<dyn ProcessingStrategy<KafkaPayload>>,
8185
) -> Box<dyn ProcessingStrategy<RoutedValue>> {
8286
match step.get() {
8387
RuntimeOperator::Map { function, route } => {
8488
let func_ref = Python::with_gil(|py| function.clone_ref(py));
8589
build_map(route, func_ref, next)
8690
}
87-
RuntimeOperator::StreamSink { .. } => {
88-
// Handle StreamSink step
89-
unimplemented!()
91+
RuntimeOperator::StreamSink {
92+
route,
93+
topic_name,
94+
kafka_config,
95+
} => {
96+
let producer = KafkaProducer::new(kafka_config.clone().into());
97+
Box::new(StreamSink::new(
98+
route.clone(),
99+
producer,
100+
topic_name,
101+
next,
102+
terminator_strategy,
103+
))
90104
}
91105
}
92106
}

sentry_streams/src/routes.rs

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ impl Route {
4747
/// Represents a message being passed between steps in the Arroyo
4848
/// consumer. All messages have a Route attached to them which
4949
/// represents the path taken by the message in the pipeline.
50+
#[derive(Debug)]
5051
pub struct RoutedValue {
5152
pub route: Route,
5253
pub payload: Py<PyAny>, // Replace Py<PyAny> with the concrete type you need

sentry_streams/src/sinks.rs

+268
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
//! This module provides an implementation of the StreamSink pipeline
2+
//! step that produces messages on a Kafka topic.
3+
//!
4+
//! As all the strategies in the Arroyo streaming pipeline adapter,
5+
//! This checks whether a message should be processed or forwarded
6+
//! via the `Route` attribute.
7+
8+
use crate::routes::{Route, RoutedValue};
9+
use pyo3::prelude::*;
10+
use pyo3::types::PyAnyMethods;
11+
use pyo3::types::PyBytes;
12+
use sentry_arroyo::backends::kafka::types::KafkaPayload;
13+
use sentry_arroyo::backends::Producer;
14+
use sentry_arroyo::processing::strategies::MessageRejected;
15+
use sentry_arroyo::types::{Message, Topic, TopicOrPartition};
16+
17+
use sentry_arroyo::processing::strategies::produce::Produce;
18+
use sentry_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig;
19+
use sentry_arroyo::processing::strategies::ProcessingStrategy;
20+
use sentry_arroyo::processing::strategies::SubmitError;
21+
use sentry_arroyo::processing::strategies::{merge_commit_request, CommitRequest, StrategyError};
22+
use std::time::Duration;
23+
24+
/// Turns a `Message<RoutedValue` passed among streaming primitives into
25+
/// a `Message<KafkaPayload>` to be produced on a Kafka topic.
26+
/// This is needed because what we need to produce on a Kafka topic is
27+
/// only the payload in bytes taken from the `Message<RoutedValue>`.
28+
/// Also this has to be converted from the Python PyAny payload into
29+
/// a Vector of bytes.
30+
fn to_kafka_payload(message: Message<RoutedValue>) -> Message<KafkaPayload> {
31+
// Convert the RoutedValue to KafkaPayload
32+
// This is a placeholder implementation
33+
let payload = Python::with_gil(|py| {
34+
let payload = &message.payload().payload;
35+
if payload.is_none(py) {
36+
return KafkaPayload::new(None, None, None);
37+
}
38+
39+
let py_bytes: &Bound<PyBytes> = payload.bind(py).downcast().unwrap();
40+
let raw_bytes = py_bytes.as_bytes();
41+
KafkaPayload::new(None, None, Some(raw_bytes.to_vec()))
42+
});
43+
message.replace(payload)
44+
}
45+
46+
/// Implements the StreamSink logic.
47+
///
48+
/// This is an Arroyo strategy that contains two next steps. One
49+
/// is the Kafka producer that turns messages into KafkaPayload and
50+
/// send them to Kafka. The second is the following step in the
51+
/// pipeline that receives all messages that have a `Route` that does
52+
/// not correspond to the one this producer is meant to process.
53+
///
54+
/// The producer is an Arroyo `Produce` strategy, which contains a
55+
/// `Producer` struct and forwards messages to a `CommitOffsets` step
56+
/// TODO: This strategy does not guarantee at least once delivery as
57+
/// it does not wait for the following step to return an offset before
58+
/// committing. It commits everything the Producer produces successfully.
59+
pub struct StreamSink {
60+
/// The route this strategy processes. Every message not for this
61+
/// strategy is sent to the `next` strategy without being processed.
62+
route: Route,
63+
produce_strategy: Produce<Box<dyn ProcessingStrategy<KafkaPayload>>>,
64+
next_strategy: Box<dyn ProcessingStrategy<RoutedValue>>,
65+
66+
/// Keeps track of the last message this strategy did not manage to
67+
/// forward. This is a hack like in `RunTask` to be able to return
68+
/// the failed message to the previous strategy after it has been
69+
/// passed to the transformer function and been mutated.
70+
message_carried_over: Option<Message<KafkaPayload>>,
71+
commit_request_carried_over: Option<CommitRequest>,
72+
}
73+
74+
impl StreamSink {
75+
pub fn new(
76+
route: Route,
77+
producer: impl Producer<KafkaPayload> + 'static,
78+
topic: &str,
79+
next_strategy: Box<dyn ProcessingStrategy<RoutedValue>>,
80+
terminator_strategy: Box<dyn ProcessingStrategy<KafkaPayload>>,
81+
) -> Self {
82+
let produce_strategy = Produce::new(
83+
terminator_strategy,
84+
producer,
85+
&ConcurrencyConfig::new(1),
86+
TopicOrPartition::from(Topic::new(topic)),
87+
);
88+
89+
StreamSink {
90+
route,
91+
produce_strategy,
92+
next_strategy,
93+
message_carried_over: None,
94+
commit_request_carried_over: None,
95+
}
96+
}
97+
}
98+
99+
impl ProcessingStrategy<RoutedValue> for StreamSink {
100+
/// Polls on both the strategies downstream.
101+
/// It simply merges the commit requests coming back from them and
102+
/// returns the result.
103+
/// TODO: In order to achieve at least once delivery we should
104+
/// always only return the lowest committed offset.
105+
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
106+
let next_commit_request = self.next_strategy.poll()?;
107+
self.commit_request_carried_over =
108+
merge_commit_request(self.commit_request_carried_over.take(), next_commit_request);
109+
110+
let produce_commit_request = self.produce_strategy.poll()?;
111+
self.commit_request_carried_over = merge_commit_request(
112+
self.commit_request_carried_over.take(),
113+
produce_commit_request,
114+
);
115+
116+
if let Some(message) = self.message_carried_over.take() {
117+
match self.produce_strategy.submit(message) {
118+
Err(SubmitError::MessageRejected(MessageRejected {
119+
message: transformed_message,
120+
})) => {
121+
self.message_carried_over = Some(transformed_message);
122+
}
123+
Err(SubmitError::InvalidMessage(invalid_message)) => {
124+
return Err(invalid_message.into());
125+
}
126+
Ok(_) => {}
127+
}
128+
}
129+
130+
Ok(self.commit_request_carried_over.take())
131+
}
132+
133+
/// Submit the message to the producer if the route in the message
134+
/// corresponds to the route in the message correspond to the Route
135+
/// attribute in this strategy.
136+
fn submit(&mut self, message: Message<RoutedValue>) -> Result<(), SubmitError<RoutedValue>> {
137+
if self.message_carried_over.is_some() {
138+
return Err(SubmitError::MessageRejected(MessageRejected { message }));
139+
}
140+
141+
if self.route != message.payload().route {
142+
self.next_strategy.submit(message)
143+
} else {
144+
match self.produce_strategy.submit(to_kafka_payload(message)) {
145+
Err(SubmitError::MessageRejected(MessageRejected {
146+
message: transformed_message,
147+
})) => {
148+
self.message_carried_over = Some(transformed_message);
149+
}
150+
Err(SubmitError::InvalidMessage(invalid_message)) => {
151+
return Err(SubmitError::InvalidMessage(invalid_message));
152+
}
153+
Ok(_) => {}
154+
}
155+
Ok(())
156+
}
157+
}
158+
159+
fn terminate(&mut self) {
160+
self.produce_strategy.terminate();
161+
self.next_strategy.terminate();
162+
}
163+
164+
fn join(&mut self, timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
165+
let next_commit = self.next_strategy.join(timeout)?;
166+
let next_commit_produce = self.produce_strategy.join(timeout)?;
167+
Ok(merge_commit_request(
168+
merge_commit_request(self.commit_request_carried_over.take(), next_commit),
169+
next_commit_produce,
170+
))
171+
}
172+
}
173+
174+
#[cfg(test)]
175+
mod tests {
176+
use super::*;
177+
use crate::fake_strategy::assert_messages_match;
178+
use crate::fake_strategy::FakeStrategy;
179+
use crate::routes::Route;
180+
use crate::test_operators::make_routed_msg;
181+
use parking_lot::Mutex;
182+
use sentry_arroyo::backends::local::broker::LocalBroker;
183+
use sentry_arroyo::backends::local::LocalProducer;
184+
use sentry_arroyo::backends::storages::memory::MemoryMessageStorage;
185+
use sentry_arroyo::processing::strategies::noop::Noop;
186+
use sentry_arroyo::utils::clock::TestingClock;
187+
use std::ops::Deref;
188+
use std::sync::Arc;
189+
use std::sync::Mutex as RawMutex;
190+
use std::time::SystemTime;
191+
192+
#[test]
193+
fn test_kafka_payload() {
194+
Python::with_gil(|py| {
195+
let message = make_routed_msg(
196+
py,
197+
PyBytes::new(py, b"test_message").into_any().unbind(),
198+
"source",
199+
vec!["waypoint1".to_string()],
200+
);
201+
let kafka_payload = to_kafka_payload(message);
202+
let py_payload = kafka_payload.payload();
203+
204+
let kafka_payload = py_payload.payload();
205+
assert!(kafka_payload.is_some());
206+
assert_eq!(kafka_payload.unwrap(), b"test_message");
207+
});
208+
}
209+
210+
#[test]
211+
fn test_route() {
212+
let result_topic = Topic::new("result-topic");
213+
let mut broker = LocalBroker::new(
214+
Box::new(MemoryMessageStorage::default()),
215+
Box::new(TestingClock::new(SystemTime::now())),
216+
);
217+
broker.create_topic(result_topic, 1).unwrap();
218+
219+
let broker = Arc::new(Mutex::new(broker));
220+
let producer = LocalProducer::new(broker.clone());
221+
222+
let submitted_messages = Arc::new(RawMutex::new(Vec::new()));
223+
let submitted_messages_clone = submitted_messages.clone();
224+
let next_step = FakeStrategy {
225+
submitted: submitted_messages,
226+
};
227+
let terminator = Noop {};
228+
229+
let mut sink = StreamSink::new(
230+
Route::new("source".to_string(), vec!["wp1".to_string()]),
231+
producer,
232+
"result-topic",
233+
Box::new(next_step),
234+
Box::new(terminator),
235+
);
236+
237+
Python::with_gil(|py| {
238+
let value = b"test_message";
239+
let message = make_routed_msg(
240+
py,
241+
PyBytes::new(py, value).into_any().unbind(),
242+
"source",
243+
vec![],
244+
);
245+
sink.submit(message).unwrap();
246+
sink.join(None).unwrap();
247+
248+
{
249+
let expected_messages = vec![PyBytes::new(py, b"test_message").into_any().unbind()];
250+
let actual_messages = submitted_messages_clone.lock().unwrap();
251+
assert_messages_match(py, expected_messages, actual_messages.deref());
252+
} // Unlock the MutexGuard around `actual_messages`
253+
254+
// Try to send to the producer
255+
// No new message on the next strategy.
256+
let message = make_routed_msg(
257+
py,
258+
PyBytes::new(py, value).into_any().unbind(),
259+
"source",
260+
vec!["wp1".to_string()],
261+
);
262+
sink.submit(message).unwrap();
263+
let expected_messages = vec![PyBytes::new(py, b"test_message").into_any().unbind()];
264+
let actual_messages = submitted_messages_clone.lock().unwrap();
265+
assert_messages_match(py, expected_messages, actual_messages.deref());
266+
});
267+
}
268+
}

0 commit comments

Comments
 (0)