Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions File Upload Service/app/READCONFIG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# config.yaml

This documentation entails what each line of the `config.yaml` file for the configurable-data-preprocessing-pipeline means and how it should be written to avoid errors

# tabular
This is the main title or header of the config, which all preprocessing style and steps are under.

# file_type
This asks the question about what type of file it is, for now this yaml file `config.yaml` accepts just `csv or json` file to be preprocessed, any other file will lead to an error.

# preprocessing
Under this is where all preprocessing options are stated and it is broken down into cleaning, transformation and validation as subsections.

# cleaning
Under cleaning we have drop_columns, dropna, drop_duplicates and rename_column steps which would all be explained below

# drop_columns:
This section under `cleaning` entails which of the column the user will like to drop or not include in the uploaded dataframe as output. note: each columns should be written in accordance to how it looks in the dataframe and it should be written with a dash in front (-) eg `- product`

# dropna:
This section still under `cleaning`, asks if the user wants to drop null or empty values its either a `true or false` response.

# drop_duplicates
Still under `cleaning`. it drops duplicate rows to reduice noise in a dataset, its either a `true or false` response.

# rename_columns
This option has to do with changing column names, oldname comes first then new name comes for example `sales: revenue`.

# transformation
This is the next section under `preprocessing` it contains `categorical_encoding` and `fillna` as subsections, and these will be explained below

# categorical_encoding
This option allows the user encode categorical or words into numbers or numeric values, how to use it: you write down the column you want to encode like this `- column1` and the pipeline changes words into numbers.

# fillna
This acts like the opposite of the `dropna` option, because you either want to drop none existing columns or fill them up with values, this `fillna` option allows you fill them up with either the column mean, median, mode or enter the value you like and it is written like `column: mean`.

# normalize
Currently we use StandardScaler to normalize data, its kept as list sinse we are not normalizing any columns but when normalization is to be done, its writen as `- column` under the columns section in the normalize.

# validation
# dtype_conversion
This contains just data type changing like changing numerical columns to float or integer, changing datetime to datetime



# below is the format for the `config.yaml` file
```
tabular:
file_type: #csv or json
preprocessing:
cleaning:
drop_columns:
# - column1
# - column2
dropna: true
drop_duplicates: true
rename_columns:
# old_name1: new_name1
# old_name2: new_name2
transformation:
categorical_encoding:
columns:
# - column 1
# - column 2
fillna:
columns:
# column1: mean
# column2: median
# column3: mode
# column4: value
normalize:
columns: []
validation:
dtype_conversion:
# - date: datetime

```


NOTE: Once all configuration have been made the pipeline will process correctly with no errors.
42 changes: 42 additions & 0 deletions File Upload Service/app/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# CONFIGURABLE PREPROCESSING PIPELINE

This readme explains all there is in the configurable data preprocessing pipeline and how it works

# cleaned_files
This folder is where all the cleaned and processed files will be stored

# temp
This folder is a temporary folder created by streamlit to run the uploaded file, its also where the original uploaded file will be stored so there will be reference to cleaned files

# config.yaml
This is where configuration for the pipeline will be made, For more info check `READCONFIG.md`.

# requirements.txt
all intalled dependencies are stored here

# stream.py
Contains the interface to upload the file and streamlit script to run the pipeline running as `streamlit run stream.py`

# tabular_pipeline.py
This is the main pipeline where the preprocessing steps will be done

The pipeline uses these imports:
`pandas`: For data manipulation
`yaml`: loads the `config.yaml` automatically
`os`: Handles file paths and directory
`logging`: logs actions taken as in the pipeline
`StringIO`: Stores logs in memory instead of printing
`datetime`: generates timestamped file names
`sklearn.preprocessing`: standardscaler; for the data normalization. `LabelEncoder; for encoding the categorical column to integer

All these libraries contribute to the pipeline structure and does what its suppose to do in accordance to the preset `config.yaml`

These pipeline is backed up with a streamlit app which acts as the user interface and where files will be uploaded

to run the pipeline

steps to run the pipeline:
# 1. setup the config file using the `READCONFIG.md` as guide
# 2. run `streamlit run stream.py` for the upload interface
# 3. Upload the file check the logging steps in that same interface to see the progress of the pipeline
# 4. once done, the preprocessed file get stored automatically in the `cleaned_files` directory
28 changes: 28 additions & 0 deletions File Upload Service/app/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
tabular:
file_type: #csv or json
preprocessing:
cleaning:
drop_columns:
# - column1
# - column2
dropna: true
drop_duplicates: true
rename_columns:
# old_name1: new_name1
# old_name2: new_name2
transformation:
categorical_encoding:
columns:
# - column 1
# - column 2
fillna:
columns:
# column1: mean
# column2: median
# column3: mode
# column4: value
normalize:
columns: []
validation:
dtype_conversion:
# - date: datetime
9 changes: 5 additions & 4 deletions File Upload Service/app/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
streamlit==1.25.0
minio==7.1.11
python-dotenv==1.0.0
pyspark==3.5.0
pandas
pyyaml
scikit-learn
category_encoders
streamlit
30 changes: 30 additions & 0 deletions File Upload Service/app/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import streamlit as st
import os
import yaml
from tabular_pipeline import pipeline

with open("config.yaml", "r") as f:
config = yaml.safe_load(f)

pipe = pipeline(config)

st.title("Configurable Data Preprocessing Pipeline")

uploaded_file = st.file_uploader("Upload your file must be (CSV or JSON)", type=["csv", "json"])

if uploaded_file:
file = os.path.join("temp", uploaded_file.name)
os.makedirs("temp", exist_ok=True)
with open(file, "wb") as f:
f.write(uploaded_file.getbuffer())

st.success(f"Uploaded {uploaded_file.name}")


if st.button("Preprocess Data"):
try:
output_file, logs = pipe.run(file)
st.success(f"Preprocessing complete! Please check the 'cleaned_files' Folder in the directory.")
st.text_area("Processing Logs", logs, height=300)
except Exception as e:
st.error(f"Error: {e}")
154 changes: 154 additions & 0 deletions File Upload Service/app/tabular_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import pandas as pd
import yaml
import os
import logging
from io import StringIO
from datetime import datetime
from sklearn.preprocessing import StandardScaler, LabelEncoder


with open("config.yaml", "r") as f:
config = yaml.safe_load(f)

log = StringIO()

sh = logging.StreamHandler(log)
sh.setLevel(logging.INFO)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(sh)

#defining the pipeline procedure
class pipeline:
def __init__(self, config):
self.config = config
self.scale = StandardScaler()
self.le = LabelEncoder()

def file_type(self, file_name):
file = self.config["tabular"].get("file_type", "csv").lower()

if file == "csv":
df = pd.read_csv(file_name)
logger.info(f"loaded the inputed CSV file {file_name}")
elif file =="json":
df = pd.read_json(file_name)
logger.info(f"loaded the inputed JSON file {file_name}")
else:
raise ValueError(f"File type {file} not supported")
return df

def cleaning(self, df):
cleaning = self.config['tabular']['preprocessing']['cleaning']

#column filtering or dropping
drop = cleaning.get("drop_columns", [])
if drop:
df.drop(columns=drop, errors="ignore", inplace=True)
logger.info(f"Dropped the following columns: {drop}")


#drop missing values
drop_nan = cleaning.get("dropna", False)
if drop_nan:
df.dropna(inplace=True)
logger.info("Dropped rows with missing values")


#drop duplicate rows
drop_dup = cleaning.get("drop_duplicates", False)
if drop_dup:
df.drop_duplicates(inplace=True)
logger.info("Dropped duplicated rows")


#renaming columns
rename_dict = cleaning.get("rename_columns", {})
if rename_dict:
df.rename(columns=rename_dict, inplace=True)
logger.info(f"Renamed the following columns: {rename_dict}")

return df

def transformation(self, df):
transformation = self.config["tabular"]["preprocessing"]["transformation"]

#encoding with Label encoder
encode = transformation.get("categorical_encoding", {}).get("columns", [])
for col in encode:
df[col] = self.le.fit_transform(df[col].astype(str))
logger.info(f"encoded these columns: {encode}")


#filling missing values in columns
fillna = transformation.get('fillna', {}).get("columns") or {}
for col, method in fillna.items():
if method == "mean":
df[col].fillna(df[col].mean(), inplace=True)
elif method == "median":
df[col].fillna(df[col].median(), inplace=True)
else:
df[col].fillna(method, inplace=True)
logger.info(f"Filled null values in {col} using {method}")


#Normalizing numeric columns
norm = transformation.get('normalize', {}).get("columns", [])
if norm:
df[norm] = self.scale.fit_transform(df[norm])
logger.info(f"Normalized the following columns: {norm}")

return df

def validation(self, df):
validation = self.config["tabular"]["preprocessing"]["validation"]

#validating data types
for cols in validation.get("dtype_conversion", []):
for col, dtype in cols.items():
try:
if dtype == "int":
df[col] = df[col].astype(int)
elif dtype == "float":
df[col] = df[col].astype(float)
elif dtype == "str":
df[col] = df[col].astype(str)
elif dtype == "datetime":
df[col] = pd.to_datetime(df[col], errors="coerce")
logger.info(f"Converted {col} to {dtype}")
except Exception as e:
logger.warning(f'Could not convert {col} to {dtype}')

return df


def save(self, df, input_file, output_dir="cleaned_files"):
os.makedirs(output_dir, exist_ok=True)
base = os.path.basename(input_file)
name, end = os.path.splitext(base)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(output_dir, f"processed_{name}{timestamp}{end}")


if end == ".csv":
df.to_csv(output_file, index=False)
elif end == ".json":
df.to_json(output_file, orient="records")
else:
raise ValueError("Output file must be csv or json")
logger.info(f"Saved file to {output_file}")

return output_file


def run(self, input_file):
df = self.file_type(input_file)
df = self.cleaning(df)
df = self.transformation(df)
df = self.validation(df)
output_file = self.save(df, input_file)

logs = log.getvalue()

return output_file, logs
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,23 @@
# redback-data-warehouse
Data Warehouse storage of code and configurations

## Garmin Run Data – ETL Pipeline Update

This ETL pipeline processes `Garmin_run_data.csv` and includes:

### Data cleaning:
- Removes duplicate rows
- Standardizes column names (lowercase, underscores)
- Converts timestamps to datetime
- Fills missing numeric values with column means
- Removes outliers in `heart_rate` (keeps values between 30–220 bpm)
- Converts distance from meters to kilometers
- Converts speed from m/s to km/h

### Data aggregation:
- Groups data by year and week
- Calculates total runs, total distance (km), average speed (km/h), and average pace (min/km) per week

### Outputs:
- `cleaned_garmin_run_data.csv` → cleaned dataset

Binary file added Requirement Gathering (4).pdf
Binary file not shown.
Loading