Skip to content

Commit eeff642

Browse files
authored
Refactor the BasePipeline, move out all Project related logic #1351 (#1358)
Signed-off-by: tdruez <[email protected]>
1 parent c83f677 commit eeff642

18 files changed

+378
-213
lines changed

Diff for: aboutcode/pipeline/README.md

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# `aboutcode.pipeline`
2+
3+
Define and run pipelines.
4+
5+
### Install
6+
7+
```bash
8+
pip install aboutcode_pipeline
9+
```
10+
11+
### Define and execute a pipeline
12+
13+
```python
14+
from aboutcode.pipeline import BasePipeline
15+
16+
class PrintMessages(BasePipeline):
17+
@classmethod
18+
def steps(cls):
19+
return (cls.step1,)
20+
21+
def step1(self):
22+
print("Message from step1")
23+
24+
PrintMessages().execute()
25+
```
26+
27+
### Groups and steps selection
28+
29+
```python
30+
from aboutcode.pipeline import BasePipeline
31+
from aboutcode.pipeline import group
32+
33+
class PrintMessages(BasePipeline):
34+
@classmethod
35+
def steps(cls):
36+
return (cls.step1, cls.step2)
37+
38+
def step1(self):
39+
print("Message from step1")
40+
41+
@group("foo")
42+
def step2(self):
43+
print("Message from step2")
44+
45+
46+
# Execute pipeline with group selection
47+
run = PrintMessages(selected_groups=["foo"])
48+
exitcode, error = run.execute()
49+
50+
# Execute pipeline with steps selection
51+
run = PrintMessages(selected_steps=["step1"])
52+
exitcode, error = run.execute()
53+
```

Diff for: aboutcode/pipeline/__init__.py

+151-49
Original file line numberDiff line numberDiff line change
@@ -28,49 +28,20 @@
2828
from pydoc import splitdoc
2929
from timeit import default_timer as timer
3030

31-
logger = logging.getLogger(__name__)
31+
module_logger = logging.getLogger(__name__)
3232

3333

34-
def group(*groups):
35-
"""Mark a function as part of a particular group."""
36-
37-
def decorator(obj):
38-
if hasattr(obj, "groups"):
39-
obj.groups = obj.groups.union(groups)
40-
else:
41-
setattr(obj, "groups", set(groups))
42-
return obj
43-
44-
return decorator
45-
46-
47-
def humanize_time(seconds):
48-
"""Convert the provided ``seconds`` number into human-readable time."""
49-
message = f"{seconds:.0f} seconds"
50-
51-
if seconds > 86400:
52-
message += f" ({seconds / 86400:.1f} days)"
53-
if seconds > 3600:
54-
message += f" ({seconds / 3600:.1f} hours)"
55-
elif seconds > 60:
56-
message += f" ({seconds / 60:.1f} minutes)"
57-
58-
return message
59-
60-
61-
class BasePipeline:
62-
"""Base class for all pipeline implementations."""
34+
class PipelineDefinition:
35+
"""
36+
Encapsulate the code related to a Pipeline definition:
37+
- Steps
38+
- Attributes
39+
- Documentation
40+
"""
6341

6442
# Flag indicating if the Pipeline is an add-on, meaning it cannot be run first.
6543
is_addon = False
6644

67-
def __init__(self, run):
68-
"""Load the Run and Project instances."""
69-
self.run = run
70-
self.project = run.project
71-
self.pipeline_name = run.pipeline_name
72-
self.env = self.project.get_env()
73-
7445
@classmethod
7546
def steps(cls):
7647
raise NotImplementedError
@@ -89,6 +60,9 @@ def get_steps(cls, groups=None):
8960

9061
steps = cls.steps()
9162

63+
if initial_steps := cls.get_initial_steps():
64+
steps = (*initial_steps, *steps)
65+
9266
if groups is not None:
9367
steps = tuple(
9468
step
@@ -152,13 +126,40 @@ def get_available_groups(cls):
152126
)
153127
)
154128

129+
130+
class PipelineRun:
131+
"""
132+
Encapsulate the code related to a Pipeline run (execution):
133+
- Execution context: groups, steps
134+
- Execution logic
135+
- Logging
136+
- Results
137+
"""
138+
139+
def __init__(self, selected_groups=None, selected_steps=None):
140+
"""Load the Pipeline class."""
141+
self.pipeline_class = self.__class__
142+
self.pipeline_name = self.__class__.__name__
143+
144+
self.selected_groups = selected_groups
145+
self.selected_steps = selected_steps or []
146+
147+
self.execution_log = []
148+
self.current_step = ""
149+
150+
def append_to_log(self, message):
151+
self.execution_log.append(message)
152+
153+
def set_current_step(self, message):
154+
self.current_step = message
155+
155156
def log(self, message):
156-
"""Log the given `message` to the current module logger and Run instance."""
157+
"""Log the given `message` to the current module logger and execution_log."""
157158
now_local = datetime.now(timezone.utc).astimezone()
158159
timestamp = now_local.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
159160
message = f"{timestamp} {message}"
160-
logger.info(message)
161-
self.run.append_to_log(message)
161+
module_logger.info(message)
162+
self.append_to_log(message)
162163

163164
@staticmethod
164165
def output_from_exception(exception):
@@ -177,23 +178,18 @@ def execute(self):
177178
"""Execute each steps in the order defined on this pipeline class."""
178179
self.log(f"Pipeline [{self.pipeline_name}] starting")
179180

180-
steps = self.get_steps(groups=self.run.selected_groups)
181-
selected_steps = self.run.selected_steps
182-
183-
if initial_steps := self.get_initial_steps():
184-
steps = initial_steps + steps
185-
181+
steps = self.pipeline_class.get_steps(groups=self.selected_groups)
186182
steps_count = len(steps)
187183
pipeline_start_time = timer()
188184

189185
for current_index, step in enumerate(steps, start=1):
190186
step_name = step.__name__
191187

192-
if selected_steps and step_name not in selected_steps:
188+
if self.selected_steps and step_name not in self.selected_steps:
193189
self.log(f"Step [{step_name}] skipped")
194190
continue
195191

196-
self.run.set_current_step(f"{current_index}/{steps_count} {step_name}")
192+
self.set_current_step(f"{current_index}/{steps_count} {step_name}")
197193
self.log(f"Step [{step_name}] starting")
198194
step_start_time = timer()
199195

@@ -206,8 +202,114 @@ def execute(self):
206202
step_run_time = timer() - step_start_time
207203
self.log(f"Step [{step_name}] completed in {humanize_time(step_run_time)}")
208204

209-
self.run.set_current_step("") # Reset the `current_step` field on completion
205+
self.set_current_step("") # Reset the `current_step` field on completion
210206
pipeline_run_time = timer() - pipeline_start_time
211207
self.log(f"Pipeline completed in {humanize_time(pipeline_run_time)}")
212208

213209
return 0, ""
210+
211+
212+
class BasePipeline(PipelineDefinition, PipelineRun):
213+
"""
214+
Base class for all pipeline implementations.
215+
It combines the pipeline definition and execution logics.
216+
"""
217+
218+
219+
def group(*groups):
220+
"""Mark a function as part of a particular group."""
221+
222+
def decorator(obj):
223+
if hasattr(obj, "groups"):
224+
obj.groups = obj.groups.union(groups)
225+
else:
226+
setattr(obj, "groups", set(groups))
227+
return obj
228+
229+
return decorator
230+
231+
232+
def humanize_time(seconds):
233+
"""Convert the provided ``seconds`` number into human-readable time."""
234+
message = f"{seconds:.0f} seconds"
235+
236+
if seconds > 86400:
237+
message += f" ({seconds / 86400:.1f} days)"
238+
if seconds > 3600:
239+
message += f" ({seconds / 3600:.1f} hours)"
240+
elif seconds > 60:
241+
message += f" ({seconds / 60:.1f} minutes)"
242+
243+
return message
244+
245+
246+
class LoopProgress:
247+
"""
248+
A context manager for logging progress in loops.
249+
250+
Usage::
251+
total_iterations = 100
252+
logger = print # Replace with your actual logger function
253+
254+
progress = LoopProgress(total_iterations, logger, progress_step=10)
255+
for item in progress.iter(iterator):
256+
"Your processing logic here"
257+
258+
# As a context manager
259+
with LoopProgress(total_iterations, logger, progress_step=10) as progress:
260+
for item in progress.iter(iterator):
261+
"Your processing logic here"
262+
"""
263+
264+
def __init__(self, total_iterations, logger, progress_step=10):
265+
self.total_iterations = total_iterations
266+
self.logger = logger
267+
self.progress_step = progress_step
268+
self.start_time = timer()
269+
self.last_logged_progress = 0
270+
self.current_iteration = 0
271+
272+
def get_eta(self, current_progress):
273+
run_time = timer() - self.start_time
274+
return round(run_time / current_progress * (100 - current_progress))
275+
276+
@property
277+
def current_progress(self):
278+
return int((self.current_iteration / self.total_iterations) * 100)
279+
280+
@property
281+
def eta(self):
282+
run_time = timer() - self.start_time
283+
return round(run_time / self.current_progress * (100 - self.current_progress))
284+
285+
def log_progress(self):
286+
reasons_to_skip = [
287+
not self.logger,
288+
not self.current_iteration > 0,
289+
self.total_iterations <= self.progress_step,
290+
]
291+
if any(reasons_to_skip):
292+
return
293+
294+
if self.current_progress >= self.last_logged_progress + self.progress_step:
295+
msg = (
296+
f"Progress: {self.current_progress}% "
297+
f"({self.current_iteration}/{self.total_iterations})"
298+
)
299+
if eta := self.eta:
300+
msg += f" ETA: {humanize_time(eta)}"
301+
302+
self.logger(msg)
303+
self.last_logged_progress = self.current_progress
304+
305+
def __enter__(self):
306+
return self
307+
308+
def __exit__(self, exc_type, exc_value, traceback):
309+
pass
310+
311+
def iter(self, iterator):
312+
for item in iterator:
313+
self.current_iteration += 1
314+
self.log_progress()
315+
yield item

0 commit comments

Comments
 (0)