Skip to content

Commit

Permalink
pipeline class documented
Browse files Browse the repository at this point in the history
  • Loading branch information
echang-dtv committed Aug 4, 2021
1 parent da14479 commit 77fa23a
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions src/normandy/engine/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

class pipeline:
# Main class - Is in charge od the orchestry of processes.

from datetime import datetime
from multiprocessing import Pool
Expand All @@ -12,21 +13,25 @@ def __init__(self, env, tags):
self.tags = set(tags)
with open("pipeline/pipeline_conf.yml") as file:
confs = load(file, Loader=SafeLoader)

# Define flows with the givens tags
self.__flows__ = [self.flow(data, flow_name, self.tags) for flow_name, data in confs["flows"].items() if self.tags.intersection(set(data["tags"]))]

# Read the project configurations
self.__confs__ = confs["confs"]
self.__confs__["active_env"] = env
self.__log_level__ = 0

def set_path(self):
import os

os.chdir(self.__confs__["path"])
return
def get_path(self):
# Return the project path
return self.__confs__['path']

def get_env_confs(self):
# Return the active enviroment configurations
return self.__confs__["envs"][self.__confs__["active_env"]]

def __step_runner__(self, step):
# Run each step using parallel processing over each process.
_t = self.datetime.now()
log = self.logger(step, self.__log_level__)
with self.Pool(step.processes_number()) as pool:
Expand All @@ -38,9 +43,11 @@ def __step_runner__(self, step):
log.info(f"Step successfully ended at {self.datetime.now()} - Step time: {self.datetime.now() - _t}")

def __process_runner__(self, process):
# Run the selected process
process.execute(self)

def __flow_runner__(self, flow):
# Run the defined data flow, step by step

_t = self.datetime.now()
log = self.main_logger(flow, self.__log_level__)
Expand All @@ -54,6 +61,7 @@ def __flow_runner__(self, flow):
log.write(f"flow successfully ended at {self.datetime.now()} - Flow time: {self.datetime.now() - _t}")

def start_pipeline(self):
# Execute the pipeline
for fl in self.__flows__:
self.__flow_runner__(fl)

Expand Down Expand Up @@ -121,11 +129,12 @@ def execute(self, pipe):
_t = datetime.now()
log = logger(self, pipe.__log_level__)

spec = spec_from_file_location(f"{self.__from_step__}.{self.__name__}", f"{pipe.__confs__['path']}/pipeline/{self.__from_step__}/{self.__name__}.py")
# Import and execute the user define module
spec = spec_from_file_location(f"{self.__from_step__}.{self.__name__}", f"{pipe.get_path()}/pipeline/{self.__from_step__}/{self.__name__}.py")
module = module_from_spec(spec)
spec.loader.exec_module(module)
try:
exit_vars = module.process(pipe, log)
module.process(pipe, log)
except Exception as e:
if not self.__error_tolerance__:
raise step_error("Step exception without errors tolerance")
Expand Down

0 comments on commit 77fa23a

Please sign in to comment.