8
8
use crate :: kafka_config:: PyKafkaConsumerConfig ;
9
9
use crate :: operators:: build;
10
10
use crate :: operators:: RuntimeOperator ;
11
+ use crate :: routes:: Route ;
12
+ use crate :: routes:: RoutedValue ;
11
13
use pyo3:: prelude:: * ;
12
14
use pyo3:: types:: PyBytes ;
13
15
use sentry_arroyo:: backends:: kafka:: types:: KafkaPayload ;
@@ -39,6 +41,8 @@ pub struct ArroyoConsumer {
39
41
40
42
topic : String ,
41
43
44
+ source : String ,
45
+
42
46
steps : Vec < Py < RuntimeOperator > > ,
43
47
44
48
/// The ProcessorHandle allows the main thread to stop the StreamingProcessor
@@ -49,10 +53,11 @@ pub struct ArroyoConsumer {
49
53
#[ pymethods]
50
54
impl ArroyoConsumer {
51
55
#[ new]
52
- fn new ( kafka_config : PyKafkaConsumerConfig , topic : String ) -> Self {
56
+ fn new ( source : String , kafka_config : PyKafkaConsumerConfig , topic : String ) -> Self {
53
57
ArroyoConsumer {
54
58
consumer_config : kafka_config,
55
59
topic,
60
+ source,
56
61
steps : Vec :: new ( ) ,
57
62
handle : None ,
58
63
}
@@ -71,13 +76,8 @@ impl ArroyoConsumer {
71
76
fn run ( & mut self ) {
72
77
tracing_subscriber:: fmt:: init ( ) ;
73
78
println ! ( "Running Arroyo Consumer..." ) ;
74
- let steps_copy = Python :: with_gil ( |py| {
75
- self . steps
76
- . iter ( )
77
- . map ( |step| step. clone_ref ( py) )
78
- . collect :: < Vec < _ > > ( )
79
- } ) ;
80
- let factory = ArroyoStreamingFactory { steps : steps_copy } ;
79
+
80
+ let factory = ArroyoStreamingFactory :: new ( self . source . clone ( ) , & self . steps ) ;
81
81
let config = self . consumer_config . clone ( ) . into ( ) ;
82
82
let processor = StreamProcessor :: with_kafka ( config, factory, Topic :: new ( & self . topic ) , None ) ;
83
83
self . handle = Some ( processor. get_handle ( ) ) ;
@@ -89,7 +89,7 @@ impl ArroyoConsumer {
89
89
} )
90
90
. expect ( "Error setting Ctrl+C handler" ) ;
91
91
92
- processor. run ( ) ;
92
+ let _ = processor. run ( ) ;
93
93
}
94
94
95
95
fn shutdown ( & mut self ) {
@@ -100,85 +100,126 @@ impl ArroyoConsumer {
100
100
}
101
101
}
102
102
103
- /// Converts a Message<KafkaPayload> to a Message<Py<PyAny>>.
104
- /// It takes the Kafka payload as bytes and turns it into a
105
- /// Python bytes object.
106
- fn to_python ( message : Message < KafkaPayload > ) -> Message < Py < PyAny > > {
107
- let payload = Python :: with_gil ( |py| {
108
- let payload = message. payload ( ) . payload ( ) . unwrap ( ) ;
109
- let py_bytes = PyBytes :: new ( py, payload) ;
110
- py_bytes. into_any ( ) . unbind ( )
103
+ /// Converts a Message<KafkaPayload> to a Message<RoutedValue>.
104
+ ///
105
+ /// The messages we send around between steps in the pipeline contain
106
+ /// the `Route` object that represent the path the message took when
107
+ /// going through branches.
108
+ /// The message coming from Kafka is a Message<KafkaPayload>, so we need
109
+ /// to turn the content into PyBytes for python to manage the content
110
+ /// and we need to wrap the message into a RoutedValue object.
111
+ fn to_routed_value ( source : & str , message : Message < KafkaPayload > ) -> Message < RoutedValue > {
112
+ let payload = Python :: with_gil ( |py| match message. payload ( ) . payload ( ) {
113
+ Some ( payload) => PyBytes :: new ( py, payload) . into_any ( ) . unbind ( ) ,
114
+ None => py. None ( ) ,
111
115
} ) ;
112
- message. replace ( payload)
116
+ let route = Route :: new ( source. to_string ( ) , vec ! [ ] ) ;
117
+ message. replace ( RoutedValue { route, payload } )
113
118
}
114
119
115
120
/// Builds the Arroyo StreamProcessor for this consumer.
116
- /// It plugs a Commit policy at the end and a translator at the beginning
117
- /// that takes the payload of the Kafka message and turns it into a Py<PyAny>
118
- fn build_chain ( steps : & [ Py < RuntimeOperator > ] ) -> Box < dyn ProcessingStrategy < KafkaPayload > > {
119
- let mut next: Box < dyn ProcessingStrategy < Py < PyAny > > > =
120
- Box :: new ( CommitOffsets :: new ( Duration :: from_secs ( 5 ) ) ) ;
121
+ ///
122
+ /// It wires up all the operators added to the consumer object,
123
+ /// it prefix the chain with a step that converts the Message<KafkaPayload>
124
+ /// to a Message<RoutedValue> and it adds a termination step provided
125
+ /// by the caller. This is generally a CommitOffsets step but it can
126
+ /// be customized.
127
+ fn build_chain (
128
+ source : & str ,
129
+ steps : & [ Py < RuntimeOperator > ] ,
130
+ ending_strategy : Box < dyn ProcessingStrategy < RoutedValue > > ,
131
+ ) -> Box < dyn ProcessingStrategy < KafkaPayload > > {
132
+ let mut next = ending_strategy;
121
133
for step in steps. iter ( ) . rev ( ) {
122
134
next = build ( step, next) ;
123
135
}
124
136
125
- let converter = RunTask :: new (
126
- |message : Message < KafkaPayload > | Ok ( to_python ( message) ) ,
127
- next,
128
- ) ;
137
+ let copied_source = source. to_string ( ) ;
138
+ let conversion_function =
139
+ move |message : Message < KafkaPayload > | Ok ( to_routed_value ( & copied_source, message) ) ;
140
+
141
+ let converter = RunTask :: new ( conversion_function, next) ;
129
142
130
143
Box :: new ( converter)
131
144
}
132
145
133
146
struct ArroyoStreamingFactory {
147
+ source : String ,
134
148
steps : Vec < Py < RuntimeOperator > > ,
135
149
}
136
150
151
+ impl ArroyoStreamingFactory {
152
+ /// Creates a new instance of ArroyoStreamingFactory.
153
+ fn new ( source : String , steps : & [ Py < RuntimeOperator > ] ) -> Self {
154
+ let steps_copy = Python :: with_gil ( |py| {
155
+ steps
156
+ . iter ( )
157
+ . map ( |step| step. clone_ref ( py) )
158
+ . collect :: < Vec < _ > > ( )
159
+ } ) ;
160
+ ArroyoStreamingFactory {
161
+ source,
162
+ steps : steps_copy,
163
+ }
164
+ }
165
+ }
166
+
137
167
impl ProcessingStrategyFactory < KafkaPayload > for ArroyoStreamingFactory {
138
168
fn create ( & self ) -> Box < dyn ProcessingStrategy < KafkaPayload > > {
139
- build_chain ( & self . steps )
169
+ build_chain (
170
+ & self . source ,
171
+ & self . steps ,
172
+ Box :: new ( CommitOffsets :: new ( Duration :: from_secs ( 5 ) ) ) ,
173
+ )
140
174
}
141
175
}
142
176
#[ cfg( test) ]
143
177
mod tests {
144
178
use super :: * ;
179
+ use crate :: fake_strategy:: assert_messages_match;
180
+ use crate :: fake_strategy:: FakeStrategy ;
145
181
use crate :: operators:: RuntimeOperator ;
146
182
use crate :: routes:: Route ;
183
+ use crate :: test_operators:: make_lambda;
184
+ use crate :: test_operators:: make_msg;
147
185
use pyo3:: ffi:: c_str;
148
186
use pyo3:: IntoPyObjectExt ;
149
- use std:: collections:: BTreeMap ;
187
+ use std:: ops:: Deref ;
188
+ use std:: sync:: { Arc , Mutex } ;
150
189
151
190
#[ test]
152
- fn test_to_python ( ) {
191
+ fn test_to_routed_value ( ) {
153
192
Python :: with_gil ( |py| {
154
193
let payload_data = b"test_payload" ;
155
- let message = Message :: new_any_message (
156
- KafkaPayload :: new ( None , None , Some ( payload_data. to_vec ( ) ) ) ,
157
- BTreeMap :: new ( ) ,
158
- ) ;
194
+ let message = make_msg ( Some ( payload_data. to_vec ( ) ) ) ;
159
195
160
- let python_message = to_python ( message) ;
196
+ let python_message = to_routed_value ( "source" , message) ;
161
197
162
198
let py_payload = python_message. payload ( ) ;
163
199
164
- let down: & Bound < PyBytes > = py_payload. bind ( py) . downcast ( ) . unwrap ( ) ;
200
+ let down: & Bound < PyBytes > = py_payload. payload . bind ( py) . downcast ( ) . unwrap ( ) ;
165
201
let payload_bytes: & [ u8 ] = down. as_bytes ( ) ;
166
202
assert_eq ! ( payload_bytes, payload_data) ;
203
+ assert_eq ! ( py_payload. route. source, "source" ) ;
204
+ assert_eq ! ( py_payload. route. waypoints. len( ) , 0 ) ;
205
+ } ) ;
206
+ }
207
+
208
+ #[ test]
209
+ fn test_to_none_python ( ) {
210
+ Python :: with_gil ( |py| {
211
+ let message = make_msg ( None ) ;
212
+ let python_message = to_routed_value ( "source" , message) ;
213
+ let py_payload = & python_message. payload ( ) . payload ;
214
+
215
+ assert ! ( py_payload. is_none( py) ) ;
167
216
} ) ;
168
217
}
169
218
170
219
#[ test]
171
220
fn test_build_chain ( ) {
172
221
Python :: with_gil ( |py| {
173
- let callable = py
174
- . eval (
175
- c_str ! ( "lambda x: x.decode('utf-8') + '_transformed'" ) ,
176
- None ,
177
- None ,
178
- )
179
- . unwrap ( )
180
- . into_py_any ( py)
181
- . unwrap ( ) ;
222
+ let callable = make_lambda ( py, c_str ! ( "lambda x: x.decode('utf-8') + '_transformed'" ) ) ;
182
223
183
224
let mut steps: Vec < Py < RuntimeOperator > > = vec ! [ ] ;
184
225
@@ -192,12 +233,22 @@ mod tests {
192
233
. unwrap ( ) ;
193
234
steps. push ( r) ;
194
235
195
- let mut chain = build_chain ( & steps) ;
196
- let message = Message :: new_any_message (
197
- KafkaPayload :: new ( None , None , Some ( b"test_payload" . to_vec ( ) ) ) ,
198
- BTreeMap :: new ( ) ,
199
- ) ;
236
+ let submitted_messages = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
237
+ let submitted_messages_clone = submitted_messages. clone ( ) ;
238
+ let next_step = FakeStrategy {
239
+ submitted : submitted_messages,
240
+ } ;
241
+
242
+ let mut chain = build_chain ( "source" , & steps, Box :: new ( next_step) ) ;
243
+ let message = make_msg ( Some ( b"test_payload" . to_vec ( ) ) ) ;
244
+
200
245
chain. submit ( message) . unwrap ( ) ;
246
+
247
+ let value = "test_payload_transformed" . into_py_any ( py) . unwrap ( ) ;
248
+ let expected_messages = vec ! [ value] ;
249
+ let actual_messages = submitted_messages_clone. lock ( ) . unwrap ( ) ;
250
+
251
+ assert_messages_match ( py, expected_messages, actual_messages. deref ( ) ) ;
201
252
} )
202
253
}
203
254
}
0 commit comments