Skip to content

Commit

Permalink
Merge pull request #17 from echang1802/documentation
Browse files Browse the repository at this point in the history
Documentation
  • Loading branch information
echang1802 authored Aug 4, 2021
2 parents 48613b9 + 77fa23a commit b89cb86
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 24 deletions.
53 changes: 36 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ by [Epsilon DataLabs](https://echang1802.github.io/epsilon.github.io)

----------------------------------------------------------------

Normandy is a data treatment framework, which main objective is standardizing your team code and provide a data treatment methodology flexible to your team needs.
Normandy is a python framework for data pipelines, which main objective is standardizing your team code and provide a data treatment methodology flexible to your team needs.

----------------------------------------------------------------

## Implementing Normandy
## Installing Normandy

We are currently working in make Normandy a fully installable package, meanwhile, you may implement Normandy _forking_ this repository and changing the _steps_ and _pipeline_ configuration files.
Normandy is available via PIP using:

```
pip install normandy
```

----------------------------------------------------------------

Expand Down Expand Up @@ -39,6 +43,18 @@ So, Normandy let you:

----------------------------------------------------------------

## Creating a Normandy project

Normandy offer a easy files structure, nevertheless, you may create it using the `create project` command with the path where to create the project.

```
normandy --create-project -file-path write/project/path
```

If the given path do not exist Normandy will create it.

This command will also create a template of the `pipeline_conf.yml` file used to configure Normandy behavior.

## How to use Normandy

The Normandy behavior is configured in the `pipeline_conf.yml` file, it has two sections:
Expand All @@ -65,8 +81,7 @@ A configured flow, should looks like:
my_flow:
tags:
- default
- main_flow
- my_flow
- sr1
steps:
read:
- read_file
Expand Down Expand Up @@ -114,14 +129,15 @@ A complete example:
my_flow:
tags:
- default
- salarians
steps:
read:
- read_file
- read_database
process:
main_processing:
avoid_tags:
- weekend
- hammerhead
side_processing:
avoid_tags:
- Shepard
Expand All @@ -133,12 +149,15 @@ my_flow:
error_tolerance: True
```

### Environment Configuration
### Pipeline Configuration

The main objective of this section if to configure your environment settings, but also is important to declare the project full path.

You are free to make all configurations you need over the environment section of the configuration file, just must use the keyword "envs", by example if you want to specify a reading and writing path of each environments do as follow:

```
confs:
path: your/project/path
envs:
dev:
read:
Expand All @@ -163,7 +182,7 @@ log is a Normandy logger object, the main function is to easily log your code.
Process snippet:

```
from engine.variables_storage import variables_storage
from normandy.engine.variables_storage import variables_storage
def process(pipe, log):
# Configurations
Expand Down Expand Up @@ -207,7 +226,7 @@ A usage example:

```
import pandas as pd
from engine.variables_storage import variables_storage
from normandy.engine.variables_storage import variables_storage
def process(pipe, log):
# Get confs
Expand All @@ -226,22 +245,22 @@ def process(pipe, log):

### How to run it

The Normandy CLI interface is still under construction, but you may run your pipeline with the command:
To run the Normandy pipeline use the command `run-pipeline` as below from the project directory:

```
python main.py
```
```
normandy --run-pipeline
```

With this command the default flow would run on the defined dev environment.
With this command the default flow would run on the defined _dev_ environment.

To specify tags you may use the tag parameter as follow:
To specify tags you may use the tag parameter:

```
python main.py -tags default -tags weekend
normandy --run-pipeline -tags my_tag -tags sr2
```

Finally to specify the environment use the parameter env:

```
python main.py -env prod
normandy --run-pipeline -env prod
```
23 changes: 16 additions & 7 deletions src/normandy/engine/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

class pipeline:
# Main class - Is in charge od the orchestry of processes.

from datetime import datetime
from multiprocessing import Pool
Expand All @@ -12,21 +13,25 @@ def __init__(self, env, tags):
self.tags = set(tags)
with open("pipeline/pipeline_conf.yml") as file:
confs = load(file, Loader=SafeLoader)

# Define flows with the givens tags
self.__flows__ = [self.flow(data, flow_name, self.tags) for flow_name, data in confs["flows"].items() if self.tags.intersection(set(data["tags"]))]

# Read the project configurations
self.__confs__ = confs["confs"]
self.__confs__["active_env"] = env
self.__log_level__ = 0

def set_path(self):
import os

os.chdir(self.__confs__["path"])
return
def get_path(self):
# Return the project path
return self.__confs__['path']

def get_env_confs(self):
# Return the active enviroment configurations
return self.__confs__["envs"][self.__confs__["active_env"]]

def __step_runner__(self, step):
# Run each step using parallel processing over each process.
_t = self.datetime.now()
log = self.logger(step, self.__log_level__)
with self.Pool(step.processes_number()) as pool:
Expand All @@ -38,9 +43,11 @@ def __step_runner__(self, step):
log.info(f"Step successfully ended at {self.datetime.now()} - Step time: {self.datetime.now() - _t}")

def __process_runner__(self, process):
# Run the selected process
process.execute(self)

def __flow_runner__(self, flow):
# Run the defined data flow, step by step

_t = self.datetime.now()
log = self.main_logger(flow, self.__log_level__)
Expand All @@ -54,6 +61,7 @@ def __flow_runner__(self, flow):
log.write(f"flow successfully ended at {self.datetime.now()} - Flow time: {self.datetime.now() - _t}")

def start_pipeline(self):
# Execute the pipeline
for fl in self.__flows__:
self.__flow_runner__(fl)

Expand Down Expand Up @@ -121,11 +129,12 @@ def execute(self, pipe):
_t = datetime.now()
log = logger(self, pipe.__log_level__)

spec = spec_from_file_location(f"{self.__from_step__}.{self.__name__}", f"{pipe.__confs__['path']}/pipeline/{self.__from_step__}/{self.__name__}.py")
# Import and execute the user define module
spec = spec_from_file_location(f"{self.__from_step__}.{self.__name__}", f"{pipe.get_path()}/pipeline/{self.__from_step__}/{self.__name__}.py")
module = module_from_spec(spec)
spec.loader.exec_module(module)
try:
exit_vars = module.process(pipe, log)
module.process(pipe, log)
except Exception as e:
if not self.__error_tolerance__:
raise step_error("Step exception without errors tolerance")
Expand Down

0 comments on commit b89cb86

Please sign in to comment.