@@ -92,7 +92,7 @@ def test_tasks_written_once_during_rebalancing() -> None:
92
92
update_topic_partitions (topic_name , num_partitions )
93
93
94
94
# Create config files for consumers
95
- print ("Creating config files for consumers" )
95
+ print ("\n Creating config files for consumers" )
96
96
TESTS_OUTPUT_PATH .mkdir (exist_ok = True )
97
97
consumer_configs = {}
98
98
for i in range (num_consumers ):
@@ -203,41 +203,29 @@ def test_tasks_written_once_during_rebalancing() -> None:
203
203
)
204
204
consumers_have_data = consumers_have_data and res >= max_pending_count // 3
205
205
206
- consumers_have_error = False
206
+ consumer_error_logs = []
207
207
for i in range (num_consumers ):
208
208
with open (str (TESTS_OUTPUT_PATH / f"consumer_{ i } _{ curr_time } .log" ), "r" ) as f :
209
- consumers_have_error = consumers_have_error or "[31mERROR" in f .read ()
209
+ lines = f .readlines ()
210
+ for log_line_index , line in enumerate (lines ):
211
+ if "[31mERROR" in line :
212
+ # If there is an error in log file, capture 10 lines before and after the error line
213
+ consumer_error_logs .append (f"Error found in consumer_{ i } . Logging 10 lines before and after the error line:" )
214
+ for j in range (max (0 , log_line_index - 10 ), min (len (lines ) - 1 , log_line_index + 10 )):
215
+ consumer_error_logs .append (lines [j ].strip ())
216
+ consumer_error_logs .append ("" )
210
217
211
218
if not all ([row [3 ] == 0 for row in row_count ]):
212
- print ("Test failed! Got duplicate/missing kafka messages in sqlite" )
219
+ print ("\n Test failed! Got duplicate/missing kafka messages in sqlite" )
213
220
214
221
if not consumers_have_data :
215
- print ("Test failed! Lower than expected amount of kafka messages in sqlite" )
216
-
217
- if consumers_have_error :
218
- print ("Test failed! Errors in consumer logs" )
219
-
220
- if (
221
- not all ([row [3 ] == 0 for row in row_count ])
222
- or not consumers_have_data
223
- or consumers_have_error
224
- ):
225
- print ()
226
- print ("Dumping logs" )
227
- print ()
228
- for i in range (num_consumers ):
229
- print (f"=== consumer { i } log ===" )
230
- with open (
231
- str (TESTS_OUTPUT_PATH / f"consumer_{ i } _{ curr_time } .log" ), "r"
232
- ) as f :
233
- print (f .read ())
234
-
235
- # Clean up test output files
236
- print (f"Cleaning up test output files in { TESTS_OUTPUT_PATH } " )
237
- shutil .rmtree (TESTS_OUTPUT_PATH )
238
-
239
- assert (
240
- all ([row [3 ] == 0 for row in row_count ])
241
- and consumers_have_data
242
- and not consumers_have_error
243
- )
222
+ print ("\n Test failed! Lower than expected amount of kafka messages in sqlite" )
223
+
224
+ if consumer_error_logs :
225
+ print ("\n Test failed! Errors in consumer logs" )
226
+ for log in consumer_error_logs :
227
+ print (log )
228
+
229
+ assert all ([row [3 ] == 0 for row in row_count ])
230
+ assert consumers_have_data
231
+ assert not consumer_error_logs
0 commit comments