Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
hgrif committed Nov 13, 2016
0 parents commit 5351e24
Show file tree
Hide file tree
Showing 16 changed files with 388 additions and 0 deletions.
93 changes: 93 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# IPython Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# dotenv
.env

# virtualenv
.venv/
venv/
ENV/

# Spyder project settings
.spyderproject

# Rope project settings
.ropeproject

# IntelliJ
.idea
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Oozie & PySpark workflow

This project demonstrates a Oozie workflow with a PySpark action. It assumes that all the PySpark logic is in a Python library that only needs a `HiveContext` and a date to run. The Python library is distributed to all the workers on the cluster and a pipeline within the library is kicked off daily depending on some data sources.

A Oozie workflow with a PySpark action is non-trivial, because:

1. Oozie doesn't have native support for a PySpark action.
2. YARN can kill your Oozie container when running PySpark in a shell.
3. Distributing eggs with PySpark can be a challenge due to problems with the `PYTHON_EGG_CACHE` (depending on cluster setup).

Issue 1. is solved by calling `spark-submit` in a `shell-action`; 2. by setting the configuration in in the workflow definition; and 3. by setting the `PYTHON_EGG_CACHE` for the driver in the calling Python script and for the executors in the `spark-submit`.

Note: this repo will crash (and this time it's not Oozie's fault!): this is a stripped/anonymized version of a workflow. . Change references to namenodes, HDFS, Oozie urls, /user/-stuff, etc.


## Submitting a new workflow

Create a Python egg by cd-ing in Python project directory and copying it to `dist/` in the Oozie workflow folder:
```
$ cd python-project/
$ python setup.py bdist_egg
$ cp dist/project-1.0-py3.5.egg ../oozie-workflow/dist/
```

Run `update_and_run.sh` to upload `oozie-workflow/` to HDFS and run the Oozie coordinator.


## Folders


#### `oozie-workflow/`

Coordinator to run a workflow.

1. `job.properties`: Coordinator properties (start & end date, job tracker, etc.)
2. `coordinator.xml`: Coordinator setting data dependencies.
* `workflow.xml`: Workflow specification.
* `bin/`: Scripts used in the workflow.
* `dist/`: Folder with Python used in the workflow


#### `python-project/`

Python project. Project code in `pythonproject`, egg instructions in `setup.py`.
20 changes: 20 additions & 0 deletions oozie-workflow/bin/run_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pandas as pd
import sys

from pyspark import SparkContext
from pyspark.sql import HiveContext

# You may not need this:
# Change .python-eggs folder because my user doesn't have a $HOME on the
# cluster, resulting in PYTHON_EGG_CACHE errors on the driver.
import os
os.environ['PYTHON_EGG_CACHE'] = '/tmp/'
os.environ['PYTHON_EGG_DIR'] = '/tmp/'

import pythonproject


if __name__ == '__main__':
date = pd.to_datetime(sys.argv[1])
hc = HiveContext(SparkContext.getOrCreate())
pythonproject.pipeline.run(hc, date)
37 changes: 37 additions & 0 deletions oozie-workflow/bin/submit_script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/bin/sh


set -ei

if [ $# -ne 2 ]
then
echo "Not enough arguments supplied"
exit 1
fi

export SPARK_HOME="/opt/cloudera/parcels/CDH/lib/spark"
# Note: Add the current dir in the Oozie container to PYTHON_PATH so Spark can
# use it (YMMV).
export PYTHONPATH="/opt/cloudera/parcels/CDH/lib/spark/python:/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.9-src.zip:."
export PYSPARK_PYTHON="/opt/cloudera/parcels/Anaconda/bin/python"
export HADOOP_USER_NAME=admin

script=$1
date=$2

sleep 10 # Sleep a bit so that you can be quick enough to see get the log in Hue.

echo "Running in script $script; date $date"

spark-submit \
--master yarn-client \
--driver-memory=8g \
--executor-memory=3g \
# You may not need this:
# Change .python-eggs folder because my user doesn't have a $HOME on the
# cluster, resulting in PYTHON_EGG_CACHE errors on the executors.
--conf spark.executorEnv.PYTHON_EGG_CACHE=/tmp/ \
# Note that both the egg and the script are put in the base folder of the
# Oozie container.
--py-files=project-1.0-py3.5.egg
"$script" "$date"
44 changes: 44 additions & 0 deletions oozie-workflow/coordinator.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<coordinator-app name="coordinator"
frequency="${coord:days(1)}"
start="${jobStart}"
end="${jobEnd}"
timezone="UTC"
xmlns="uri:oozie:coordinator:0.4">

<controls>
<!-- See http://stackoverflow.com/a/21818132 -->
<timeout>-1</timeout>
<concurrency>1</concurrency>
<execution>FIFO</execution>
<throttle>1</throttle>
</controls>

<datasets>
<dataset name="data-in-hdfs" frequency="${coord:hours(1)}" initial-instance="2015-11-01T00:00Z" timezone="UTC">
<uri-template>hdfs://nameservice1/data/hive/warehouse/database.db/table/utc_date=${YEAR}-${MONTH}-${DAY}/utc_hour=${HOUR}</uri-template>
<done-flag></done-flag>
</dataset>
</datasets>

<input-events>
<data-in name="event-data-in-hdfs" dataset="data_in-hdfs">
<!-- Delay some days. -->
<instance>${coord:current(2)}</instance>
</data-in>
</input-events>

<action>
<workflow>
<app-path>${workflowRoot}/workflow.xml</app-path>
<configuration>
<!-- Pass the date to the workflow. -->
<property>
<name>date</name>
<value>${coord:nominalTime()}</value>
</property>
</configuration>
</workflow>
</action>


</coordinator-app>
Empty file added oozie-workflow/dist/.gitignore
Empty file.
6 changes: 6 additions & 0 deletions oozie-workflow/job.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
nameNode=hdfs://nameservice1
jobTracker=namenode002:8032
jobStart=2016-08-01T00:00Z
jobEnd=2016-10-01T00:00Z
workflowRoot=/user/admin/workflows/oozie-workflow/
oozie.coord.application.path=/user/admin/workflows/oozie-workflow/coordinator.xml
63 changes: 63 additions & 0 deletions oozie-workflow/workflow.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<workflow-app name="workflow" xmlns="uri:oozie:workflow:0.5">

<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<!-- Configuration so YARN doesn't kill the AM/launcher. -->
<property>
<name>oozie.launcher.mapreduce.map.memory.mb</name>
<value>9216</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>-Xmx6144m</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name>
<value>768</value>
</property>
<!-- Properties that I have set but I'm not sure of if I need them. -->
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>launcher</value>
</property>
<property>
<name>mapred.task.timeout</name>
<value>7200000</value>
</property>
<property>
<name>mapreduce.reduce.shuffle.connect.timeout</name>
<value>7200000</value>
</property>
<property>
<name>mapreduce.reduce.shuffle.read.timeout</name>
<value>7200000</value>
</property>
</configuration>
</global>

<start to="bash-pipeline"/>

<!-- Bash script to do the spark-submit. The version numbers of these actions are magic. -->
<action name="bash-pipeline">
<shell xmlns="uri:oozie:shell-action:0.3">
<exec>submit_script.sh</exec>
<argument>run_pipeline.py</argument>
<argument>${date}</argument> <!-- This got passed from the coordinator. -->
<file>${workflowRoot}/bin/submit_script.sh</file>
<file>${workflowRoot}/bin/run_pipeline.py</file>
<file>${workflowRoot}/dist/project.1.0-py3.5.egg</file>
</shell>
<ok to="end"/>
<error to="fail"/>
</action>

<!-- You wish you'd ever get Oozie errors. -->
<kill name="fail">
<message>Bash action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>

<end name ="end"/>

</workflow-app>
Empty file added python-project/dist/.gitignore
Empty file.
2 changes: 2 additions & 0 deletions python-project/pythonproject/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from . import foo
from . import bar
8 changes: 8 additions & 0 deletions python-project/pythonproject/bar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
def process(hive_context, date):
print('Processing bar')
print(hive_context, date)


def compute(hive_context, date):
print('Computing bar')
print(hive_context, date)
8 changes: 8 additions & 0 deletions python-project/pythonproject/foo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
def process(hive_context, date):
print('Processing foo')
print(hive_context, date)


def compute(hive_context, date):
print('Computing foo')
print(hive_context, date)
Empty file.
16 changes: 16 additions & 0 deletions python-project/pythonproject/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pythonproject


def _foo(hive_context, date):
pythonproject.foo.process(hive_context, date)
pythonproject.foo.compute(hive_context, date)


def _bar(hive_context, date):
pythonproject.bar.process(hive_context, date)
pythonproject.bar.compute(hive_context, date)


def run(hive_context, date):
_foo(hive_context, date)
_bar(hive_context, date)
22 changes: 22 additions & 0 deletions python-project/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from setuptools import setup, find_packages

setup(
name='project',

version='1.0',

description='Python Project',

author='Henk Griffioen',

packages=find_packages(exclude=['bin', 'notebooks']),

# In case you're adding model files to your package, may make people angry.
package_data={
'project': ['models/dummy.pickle'],
},
)

# More info on data files:
# https://www.metabrite.com/devblog/posts/including-data-files-in-python-packages/
# https://setuptools.readthedocs.io/en/latest/setuptools.html#including-data-files
Loading

0 comments on commit 5351e24

Please sign in to comment.