diff --git a/File Upload Service/app/CONFIG_DOCUMENTATION.md b/File Upload Service/app/CONFIG_DOCUMENTATION.md new file mode 100644 index 0000000..e9340d7 --- /dev/null +++ b/File Upload Service/app/CONFIG_DOCUMENTATION.md @@ -0,0 +1,252 @@ + +# Configuration File (`config.yaml`) Documentation + +This document explains each configuration option in your YAML file, its purpose, accepted values, and how it affects preprocessing. + +--- + +## 1. Tabular Section + +```yaml +tabular: + path: data/sample.csv + output_folder: output/tabular + type: csv + preprocessing: + add_row_id: true + categorical_encoding: + columns: [gender, city] + method: onehot + cleaning: + lowercase: true + remove_special_chars: false + trim_strings: true + column_filtering: + drop: [] + keep: [age, income, gender, city] + drop_duplicates: true + dtype_conversion: + age: int + gender: category + income: float + missing_values: + columns: + age: 30 + name: Unknown + global_fill: null + normalization: + columns: [age, income] + method: minmax + outlier_removal: + columns: [age, income] + method: iqr + threshold: 1.5 + remove_empty_columns: true + rename_columns: + oldName: new_name + productID: product_id +``` + +### Explanation of Fields + +- **`path`** + - *Type:* string + - *Description:* File path to your raw tabular dataset (CSV or JSON). + - *Example:* `"data/sample.csv"` + - *Notes:* Must exist and be readable. + +- **`output_folder`** + - *Type:* string + - *Description:* Directory where the processed tabular output will be saved as CSV. + - *Example:* `"output/tabular"` + - *Notes:* Directory will be created if missing. + +- **`type`** + - *Type:* string (`csv` or `json`) + - *Description:* Specifies the format of the input tabular file. + - *Example:* `"csv"` + +--- + +### `preprocessing` options + +Each key under `preprocessing` is an optional step. If omitted, that step is skipped. + +#### `add_row_id` +- *Type:* boolean +- *Description:* Adds a unique row ID column named `row_id` at the beginning of the DataFrame. + +#### `categorical_encoding` +- *Type:* dict +- *Fields:* + - `columns` (list of strings): Columns to encode. + - `method` (string): Encoding method. Supported: + - `"onehot"` — creates one-hot encoded dummy variables. + - `"label"` — converts categories to integer labels. +- *Notes:* Apply after filtering and cleaning. + +#### `cleaning` +- *Type:* dict +- *Fields:* + - `lowercase` (bool): Convert string columns to lowercase. + - `remove_special_chars` (bool): (Not implemented yet — reserved for future) Remove special characters in strings. + - `trim_strings` (bool): Remove leading/trailing whitespace from string columns. + +#### `column_filtering` +- *Type:* dict +- *Fields:* + - `keep` (list of strings): Keep only these columns (drop others). If specified, overrides `drop`. + - `drop` (list of strings): Drop these columns. Ignored if `keep` is present. +- *Note:* Filtering should be done early for efficiency. + +#### `drop_duplicates` +- *Type:* boolean +- *Description:* Remove exact duplicate rows. + +#### `dtype_conversion` +- *Type:* dict +- *Fields:* key = column name, value = target data type (e.g., `int`, `float`, `category`, `str`). +- *Example:* + ```yaml + dtype_conversion: + age: int + income: float + ``` + +#### `missing_values` +- *Type:* dict +- *Fields:* + - `columns`: dict mapping column names to fill values for missing entries. + - `global_fill`: single value to fill all missing values if specified (overrides column-specific fills). Use `null` to disable. +- *Example:* + ```yaml + missing_values: + columns: + age: 30 + name: Unknown + global_fill: null + ``` + +#### `normalization` +- *Type:* dict +- *Fields:* + - `columns` (list): Columns to normalize. + - `method` (string): Normalization method: + - `"minmax"` — scale values to [0,1] range. + - `"standard"` — zero mean, unit variance scaling. + +#### `outlier_removal` +- *Type:* dict +- *Fields:* + - `columns` (list): Columns to check for outliers. + - `method` (string): Method to detect outliers: + - `"iqr"` — Interquartile Range method. + - `"zscore"` — Z-score thresholding. + - `threshold` (float): Threshold multiplier, e.g. 1.5 for IQR, or z-score limit. + +#### `remove_empty_columns` +- *Type:* boolean +- *Description:* Remove columns that contain only missing values. + +#### `rename_columns` +- *Type:* dict +- *Description:* Mapping from original column names to new names. +- *Example:* + ```yaml + rename_columns: + oldName: new_name + productID: product_id + ``` + +--- + +## 2. Images Section + +```yaml +images: + path: temp_input/images + output_folder: temp_output/images + preprocessing: + grayscale: true + normalize: true + resize: [128, 128] +``` + +- **`path`** + Folder containing input images. + +- **`output_folder`** + Folder to save processed images. + +- **`preprocessing`** + - `grayscale` (bool): Convert images to grayscale. + - `normalize` (bool): Normalize pixel values to [0,1]. + - `resize` (list of two ints): Resize images to `[width, height]`. + +--- + +## 3. Videos Section + +```yaml +videos: + path: data/videos/sample.mp4 + output_folder: output/video_frames + preprocessing: + extract_frames: 30 + resize_frames: [64, 64] +``` + +- **`path`** + Path to input video file. + +- **`output_folder`** + Folder to save extracted and processed frames. + +- **`preprocessing`** + - `extract_frames` (int): Number of frames to extract or interval. + - `resize_frames` (list of two ints): Resize extracted frames to `[width, height]`. + +--- + +## How to use the config file + +1. **Prepare your data and directory structure.** + Make sure your files and folders exist for the paths you set. + +2. **Edit the YAML file** with the preprocessing steps you want for each data type. + - To skip a step, just remove or comment it out. + - Use boolean flags to toggle on/off steps. + - Provide lists for columns and mappings for renaming. + +3. **Run the pipeline script.** + It will read your config and execute the steps in order. + +4. **Check output folders** for processed files and logs. + +5. **Review metadata JSON** for detailed information about each preprocessing step and statistics. + +--- +**FOLDER STRUCTURE** + +project_root/ +├── config.yaml # Config file +├── pipeline.py # Main pipeline script to run preprocessing +├── utils/ # Source code (tabular.py, images.py, videos.py) +├── app.py # Streamlit app +│ +├── data/ # Raw input data (as referenced in config) +│ ├── images/ # Raw images folder (for image preprocessing) +│ │ ├── img1.png +│ │ └── img2.jpg +│ ├── videos/ # Raw videos folder +│ │ └── sample.mp4 +│ └── sample.csv # Raw tabular CSV file +│ +├── output/ # Processed output data (created by the pipeline) +│ ├── images/ # Processed images saved here +│ ├── tabular/ # Processed CSVs saved here +│ └── video_frames/ # Extracted and resized video frames +│ +├── requirements.txt # Python dependencies for the project +└── CONFIG_DOCUMENTATION.md # Documentation for config file usage +└── README.md # Project Documentation diff --git a/File Upload Service/app/README.md b/File Upload Service/app/README.md new file mode 100644 index 0000000..747fb54 --- /dev/null +++ b/File Upload Service/app/README.md @@ -0,0 +1,82 @@ +# Data Preprocessing Pipeline + +## Overview + +This project is a configurable data preprocessing pipeline designed to handle multiple data types including tabular data (CSV/JSON), images, and videos. The pipeline reads raw input data, applies various preprocessing steps as specified in a YAML configuration file (`config.yaml`), and outputs the cleaned and transformed data into organized output folders. for morE info about how the config.yaml file works see `CONFIG_DOCUMENTATION.md` + +## Features + +- **Tabular Data Preprocessing:** + - Handling missing values with global or column-specific fills + - Encoding categorical variables (one-hot or label encoding) + - Normalization (Min-Max or Standard scaling) + - Outlier removal (IQR or z-score methods) + - Cleaning string columns (trimming, lowercasing) + - Column filtering (keep/drop specified columns) + - Data type conversions + - Duplicate removal + - Column renaming + - Adding unique row IDs + +- **Image Preprocessing:** + - Grayscale conversion + - Normalization + - Resizing + +- **Video Preprocessing:** + - Frame extraction at specified intervals + - Frame resizing + +## How it Works + +- Define preprocessing steps and file paths in `config.yaml`. +- Run the pipeline script (`preprocess.py`), which: + - Loads the config file + - Processes tabular, image, and video data as specified + - Saves processed outputs to designated folders + - Logs processing details and saves metadata JSON for each run + + **Folder Structure** + + project_root/ +├── config.yaml # Config file +├── preprocess.py # Main pipeline script to run preprocessing +├── utils/ # Source code (tabular.py, images.py, videos.py) +├── app.py # Streamlit app for testing +│ +├── data/ # Raw input data (as referenced in config) +│ ├── images/ # Raw images folder (for image preprocessing) +│ │ ├── img1.png +│ │ └── img2.jpg +│ ├── videos/ # Raw videos folder +│ │ └── sample.mp4 +│ └── sample.csv # Raw tabular CSV file +│ +├── output/ # Processed output data (created by the pipeline) +│ ├── images/ # Processed images saved here +│ ├── tabular/ # Processed CSVs saved here +│ └── video_frames/ # Extracted and resized video frames +│ +├── requirements.txt # Python dependencies for the project +└── CONFIG_DOCUMENTATION.md # Documentation for config file usage +└── README.md # Project Documentation + + + +## Getting Started + +1. Install dependencies: + + pip install -r requirements.txt + +2. Prepare your raw data inside the `data/` folder. + +3. Customize your preprocessing pipeline via config.yaml. + +4. Run the pipeline: + + python preprocess.py + +5. Run for testing: + + streamlit run app.py \ No newline at end of file diff --git a/File Upload Service/app/__init__.py b/File Upload Service/app/__init__.py new file mode 100644 index 0000000..db3e327 --- /dev/null +++ b/File Upload Service/app/__init__.py @@ -0,0 +1 @@ +# utils package diff --git a/File Upload Service/app/app.py b/File Upload Service/app/app.py new file mode 100644 index 0000000..50e6e26 --- /dev/null +++ b/File Upload Service/app/app.py @@ -0,0 +1,154 @@ +import streamlit as st +import yaml +import os +import subprocess +import pandas as pd +import json +import shutil +from datetime import datetime +from PIL import Image + +st.set_page_config(page_title="Redback Preprocessing Tester", layout="centered") +st.title("🧹 Redback Data Preprocessing Test App") + +# Unified uploader +uploaded_files = st.file_uploader( + "📁 Upload Tabular (CSV/JSON), Images (PNG/JPG), or Video (MP4)", + type=["csv", "json", "png", "jpg", "jpeg", "mp4"], + accept_multiple_files=True +) + +# Separate config uploader +config_file = st.file_uploader("📄 Upload YAML Config", type=["yaml", "yml"]) + +# Initialize containers +tabular_file = None +image_files = [] +video_file = None + +# Sort uploaded files +if uploaded_files: + for file in uploaded_files: + ext = file.name.lower().split('.')[-1] + if ext in ["csv", "json"] and tabular_file is None: + tabular_file = file + elif ext in ["png", "jpg", "jpeg"]: + image_files.append(file) + elif ext == "mp4" and video_file is None: + video_file = file + +# Proceed if config is uploaded +if config_file: + with st.spinner("Preparing files..."): + # Save config + config_path = "config.yaml" + with open(config_path, "wb") as f: + f.write(config_file.getvalue()) + + # Load config + with open(config_path, "r") as f: + config = yaml.safe_load(f) + + # Check required input folders exist, else error out + # Tabular file handling + if tabular_file and "tabular" in config: + tabular_path = os.path.join("temp_input", tabular_file.name) + if not os.path.isdir("temp_input"): + st.error("Input folder 'temp_input' does not exist. Please create it manually.") + st.stop() + with open(tabular_path, "wb") as f: + f.write(tabular_file.getvalue()) + config["tabular"]["path"] = tabular_path + if "output_folder" not in config["tabular"]: + st.error("Config 'tabular' section missing 'output_folder'.") + st.stop() + + # Image files handling + if image_files and "images" in config: + img_dir = "temp_input/images" + if not os.path.isdir(img_dir): + st.error(f"Input images folder '{img_dir}' does not exist. Please create it manually.") + st.stop() + for img in image_files: + with open(os.path.join(img_dir, img.name), "wb") as f: + f.write(img.getvalue()) + config["images"]["path"] = img_dir + if "output_folder" not in config["images"]: + st.error("Config 'images' section missing 'output_folder'.") + st.stop() + + # Video file handling + if video_file and "videos" in config: + video_path = os.path.join("temp_input", video_file.name) + if not os.path.isdir("temp_input"): + st.error("Input folder 'temp_input' does not exist. Please create it manually.") + st.stop() + with open(video_path, "wb") as f: + f.write(video_file.getvalue()) + config["videos"]["path"] = video_path + if "output_folder" not in config["videos"]: + st.error("Config 'videos' section missing 'output_folder'.") + st.stop() + + # Save updated config + with open(config_path, "w") as f: + yaml.dump(config, f) + + st.subheader("✅ Config Loaded") + st.json(config) + + # Run preprocessing + st.info("Running preprocessing pipeline...") + with st.spinner("Processing..."): + try: + result = subprocess.run(["python", "preprocess.py"], capture_output=True, text=True) + st.text(result.stdout) + if result.stderr: + st.error(result.stderr) + except Exception as e: + st.error(f"❌ Pipeline execution failed: {e}") + st.stop() + + # Tabular preview + tabular_out = os.path.join(config["tabular"]["output_folder"], "processed_tabular.csv") + if os.path.exists(tabular_out): + st.subheader("📈 Tabular Output Preview") + df_out = pd.read_csv(tabular_out) + st.dataframe(df_out.head()) + else: + st.warning(f"Tabular output file not found: {tabular_out}") + + # Image preview + img_out_dir = config["images"]["output_folder"] + if os.path.exists(img_out_dir): + img_files = sorted(os.listdir(img_out_dir))[:5] + if img_files: + st.subheader("🖼️ Processed Images Preview") + for img_name in img_files: + img_path = os.path.join(img_out_dir, img_name) + st.image(Image.open(img_path), caption=img_name) + else: + st.warning(f"Image output folder not found: {img_out_dir}") + + # Video frame preview + vid_out_dir = config["videos"]["output_folder"] + if os.path.exists(vid_out_dir): + frame_files = sorted(os.listdir(vid_out_dir))[:5] + if frame_files: + st.subheader("🎥 Processed Video Frames Preview") + for frame_name in frame_files: + frame_path = os.path.join(vid_out_dir, frame_name) + st.image(Image.open(frame_path), caption=frame_name) + else: + st.warning(f"Video frames output folder not found: {vid_out_dir}") + + # Metadata display + metadata_files = [f for f in os.listdir() if f.startswith("metadata_") and f.endswith(".json")] + if metadata_files: + latest_meta = sorted(metadata_files)[-1] + with open(latest_meta, "r") as f: + metadata = json.load(f) + st.subheader("🧾 Metadata Summary") + st.json(metadata) + + st.success("✅ Processing complete and preview displayed.") diff --git a/File Upload Service/app/config.yaml b/File Upload Service/app/config.yaml new file mode 100644 index 0000000..6cbc9f7 --- /dev/null +++ b/File Upload Service/app/config.yaml @@ -0,0 +1,66 @@ +tabular: + output_folder: output/tabular + path: data/sample.csv + preprocessing: + add_row_id: true + categorical_encoding: + columns: + - gender + - city + method: onehot + cleaning: + lowercase: true + remove_special_chars: false + trim_strings: true + column_filtering: + drop: [] + keep: + - age + - income + - gender + - city + drop_duplicates: true + dtype_conversion: + age: int + gender: category + income: float + missing_values: + columns: + age: 30 + name: Unknown + global_fill: null + normalization: + columns: + - age + - income + method: minmax + outlier_removal: + columns: + - age + - income + method: iqr + threshold: 1.5 + remove_empty_columns: true + rename_columns: + oldName: new_name + productID: product_id + type: csv + +images: + path: data/images + output_folder: output/images + preprocessing: + grayscale: true + normalize: true + resize: + - 128 + - 128 + +videos: + output_folder: output/video_frames + path: data/videos/sample.mp4 + preprocessing: + extract_frames: 30 + resize_frames: + - 64 + - 64 diff --git a/File Upload Service/app/images.py b/File Upload Service/app/images.py new file mode 100644 index 0000000..0113da4 --- /dev/null +++ b/File Upload Service/app/images.py @@ -0,0 +1,54 @@ +import os +import pandas as pd +from PIL import Image +import numpy as np +from datetime import datetime + +def save_processed_images(images, output_folder, save_as="png", logger=None): + valid_formats = ["png", "jpg", "jpeg", "bmp", "tiff"] + save_as = save_as.lower() + + if save_as not in valid_formats: + if logger: + logger.warning(f"Unsupported image format '{save_as}'. Defaulting to 'png'.") + save_as = "png" + + os.makedirs(output_folder, exist_ok=True) + + metadata = [] + + for i, img_array in enumerate(images): + img_uint8 = (img_array * 255).astype(np.uint8) + + # Convert to PIL image + if img_uint8.ndim == 2: + img_pil = Image.fromarray(img_uint8, mode='L') + else: + img_pil = Image.fromarray(img_uint8) + + # Save image + filename = f"processed_img_{i}.{save_as}" + save_path = os.path.join(output_folder, filename) + img_pil.save(save_path) + + # Collect metadata + metadata.append({ + "filename": filename, + "width": img_pil.width, + "height": img_pil.height, + "timestamp": datetime.now().isoformat(), + "processed_path": save_path + }) + + if logger: + logger.debug(f"Saved processed image {save_path}") + + # Save metadata CSV + metadata_df = pd.DataFrame(metadata) + metadata_csv_path = os.path.join(output_folder, "metadata.csv") + metadata_df.to_csv(metadata_csv_path, index=False) + + if logger: + logger.info(f"Saved image metadata to {metadata_csv_path}") + + return metadata \ No newline at end of file diff --git a/File Upload Service/app/preprocess.py b/File Upload Service/app/preprocess.py new file mode 100644 index 0000000..bfc651a --- /dev/null +++ b/File Upload Service/app/preprocess.py @@ -0,0 +1,136 @@ +import yaml +import pandas as pd +from utils import tabular, images, videos +import logging +from datetime import datetime +import json +import os + +def setup_logger(log_path='pipeline.log'): + logger = logging.getLogger('DataPreprocessingPipeline') + logger.setLevel(logging.DEBUG) + + if not logger.handlers: # Avoid duplicate handlers in repeated runs + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + ch.setFormatter(formatter) + logger.addHandler(ch) + + fh = logging.FileHandler(log_path) + fh.setLevel(logging.DEBUG) + fh.setFormatter(formatter) + logger.addHandler(fh) + + return logger + +def load_tabular_data(path, data_type='csv'): + if data_type == 'csv': + return pd.read_csv(path) + elif data_type == 'json': + return pd.read_json(path) + else: + raise ValueError(f"Unsupported tabular type: {data_type}") + +def main(): + logger = setup_logger() + logger.info("Starting preprocessing pipeline") + + metadata = { + 'run_id': datetime.now().strftime('%Y%m%d_%H%M%S'), + 'start_time': datetime.now().isoformat(), + 'steps': [] + } + + def log_step(step_name, info): + metadata['steps'].append({ + 'step': step_name, + 'timestamp': datetime.now().isoformat(), + 'info': info + }) + + # Load config + with open("config.yaml") as f: + config = yaml.safe_load(f) + logger.info("Loaded config.yaml") + + # TABULAR + if 'tabular' in config: + tab_cfg = config['tabular'] + logger.info(f"Loading tabular data from {tab_cfg['path']}") + df = load_tabular_data(tab_cfg['path'], tab_cfg.get('type', 'csv')) + logger.info(f"Original tabular shape: {df.shape}") + + processed_df, tab_metadata = tabular.preprocess_tabular(df, tab_cfg['preprocessing'], logger=logger) + logger.info(f"Processed tabular shape: {processed_df.shape}") + + if 'output_folder' in tab_cfg: + os.makedirs(tab_cfg['output_folder'], exist_ok=True) + save_path = os.path.join(tab_cfg['output_folder'], "processed_tabular.csv") + processed_df.to_csv(save_path, index=False) + logger.info(f"Saved processed tabular data to {save_path}") + + log_step('tabular_preprocessing', { + 'input_shape': df.shape, + 'output_shape': processed_df.shape, + 'output_folder': tab_cfg.get('output_folder'), + 'metadata': tab_metadata + }) + + # IMAGES + if 'images' in config: + img_cfg = config['images']['preprocessing'] + img_path = config['images']['path'] + output_folder = config['images'].get('output_folder') + + logger.info(f"Processing images from {img_path}") + imgs = images.preprocess_images(img_path, img_cfg, logger=logger) + logger.info(f"Processed {len(imgs)} images.") + + if output_folder: + img_metadata = images.save_processed_images(imgs, output_folder, save_as="png", logger=logger) + logger.info(f"Saved processed images and metadata to {output_folder}") + else: + img_metadata = [] + + log_step('image_preprocessing', { + 'num_images_processed': len(imgs), + 'output_folder': output_folder, + 'metadata': img_metadata + }) + + # VIDEOS + if 'videos' in config: + vid_cfg = config['videos']['preprocessing'] + vid_path = config['videos']['path'] + output_folder = config['videos'].get('output_folder') + + logger.info(f"Processing video from {vid_path}") + video_metadata_dict = {} + frames = videos.preprocess_video(vid_path, vid_cfg, logger=logger, metadata=video_metadata_dict) + logger.info(f"Processed {len(frames)} video frames.") + + if output_folder: + vid_metadata = videos.save_processed_video_frames(frames, output_folder, logger=logger) + logger.info(f"Saved video frames and metadata to {output_folder}") + else: + vid_metadata = [] + + log_step('video_preprocessing', { + 'num_frames_processed': len(frames), + 'output_folder': output_folder, + 'metadata': vid_metadata, + **video_metadata_dict + }) + + # Save metadata + metadata['end_time'] = datetime.now().isoformat() + metadata_path = f"metadata_{metadata['run_id']}.json" + with open(metadata_path, 'w') as f: + json.dump(metadata, f, indent=4) + logger.info(f"Saved pipeline metadata to {metadata_path}") + + logger.info("Pipeline finished successfully") + +if __name__ == "__main__": + main() diff --git a/File Upload Service/app/requirements.txt b/File Upload Service/app/requirements.txt index 157470f..1fddcfa 100644 --- a/File Upload Service/app/requirements.txt +++ b/File Upload Service/app/requirements.txt @@ -1,4 +1,7 @@ -streamlit==1.25.0 -minio==7.1.11 -python-dotenv==1.0.0 -pyspark==3.5.0 \ No newline at end of file +PyYAML>=6.0 +pandas>=1.3 +numpy>=1.21 +scikit-learn>=1.0 +scipy>=1.7 +Pillow>=9.0 +streamlit>=1.48 diff --git a/File Upload Service/app/tabular.py b/File Upload Service/app/tabular.py new file mode 100644 index 0000000..80b33bf --- /dev/null +++ b/File Upload Service/app/tabular.py @@ -0,0 +1,303 @@ +import pandas as pd +import numpy as np +from sklearn.preprocessing import MinMaxScaler, StandardScaler, LabelEncoder +from scipy.stats import zscore +import os + +# ------------------------------ +# Missing Values +# ------------------------------ +def handle_missing_values(df, missing_cfg, logger=None): + meta = {'missing_values_filled': 0} + if logger: + logger.info("Handling missing values") + + before_na = df.isna().sum().sum() + + if 'global_fill' in missing_cfg and missing_cfg['global_fill'] is not None: + df = df.fillna(missing_cfg['global_fill']) + if logger: + logger.debug(f"Filled all missing values with: {missing_cfg['global_fill']}") + + if 'columns' in missing_cfg: + for col, val in missing_cfg['columns'].items(): + if col in df.columns: + df[col] = df[col].fillna(val) + if logger: + logger.debug(f"Filled missing values in column '{col}' with: {val}") + + after_na = df.isna().sum().sum() + meta['missing_values_filled'] = before_na - after_na + return df, meta + +# ------------------------------ +# Normalization +# ------------------------------ +def normalize_columns(df, norm_cfg, logger=None): + meta = {} + method = norm_cfg.get('method', 'minmax') + cols = norm_cfg.get('columns', []) + + if not cols: + if logger: + logger.info("No columns specified for normalization; skipping") + return df, meta + + if logger: + logger.info(f"Normalizing columns {cols} using method '{method}'") + + if method == 'minmax': + scaler = MinMaxScaler() + elif method == 'standard': + scaler = StandardScaler() + else: + if logger: + logger.warning(f"Unknown normalization method '{method}'; skipping normalization") + return df, meta + + df[cols] = scaler.fit_transform(df[cols]) + meta['normalized_columns'] = cols + meta['normalization_method'] = method + return df, meta + +# ------------------------------ +# Encoding +# ------------------------------ +def encode_categorical(df, encode_cfg, logger=None): + meta = {} + method = encode_cfg.get('method', 'onehot') + cols = encode_cfg.get('columns', []) + + if logger: + logger.info(f"Encoding categorical columns {cols} using method '{method}'") + + if method == 'label': + for col in cols: + if col in df.columns: + le = LabelEncoder() + df[col] = le.fit_transform(df[col].astype(str)) + if logger: + logger.debug(f"Label encoded column '{col}'") + meta['encoded_columns'] = cols + meta['encoding_method'] = 'label' + + elif method == 'onehot': + existing_cols = [c for c in cols if c in df.columns] + df = pd.get_dummies(df, columns=existing_cols) + meta['encoded_columns'] = existing_cols + meta['encoding_method'] = 'onehot' + if logger: + logger.debug(f"One-hot encoded columns {existing_cols}") + + else: + if logger: + logger.warning(f"Unknown encoding method '{method}'; skipping encoding") + + return df, meta + +# ------------------------------ +# Outlier Removal +# ------------------------------ +def remove_outliers(df, outlier_cfg, logger=None): + meta = {'outliers_removed': 0} + method = outlier_cfg.get('method', 'iqr') + cols = outlier_cfg.get('columns', []) + threshold = outlier_cfg.get('threshold', 1.5) + + if logger: + logger.info(f"Removing outliers using method '{method}' on columns {cols} with threshold {threshold}") + + before_rows = df.shape[0] + + if method == 'iqr': + for col in cols: + if col in df.columns: + Q1 = df[col].quantile(0.25) + Q3 = df[col].quantile(0.75) + IQR = Q3 - Q1 + lower_bound = Q1 - threshold * IQR + upper_bound = Q3 + threshold * IQR + df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)] + + elif method == 'zscore': + for col in cols: + if col in df.columns: + z_scores = np.abs(zscore(df[col])) + df = df[z_scores < threshold] + + else: + if logger: + logger.warning(f"Unknown outlier removal method '{method}'; skipping") + + after_rows = df.shape[0] + meta['outliers_removed'] = before_rows - after_rows + return df, meta + +# ------------------------------ +# Cleaning +# ------------------------------ +def clean_data(df, cleaning_cfg, logger=None): + meta = {} + if logger: + logger.info("Cleaning data") + + if cleaning_cfg.get('trim_strings', False): + df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x) + meta['trim_strings'] = True + if logger: + logger.debug("Trimmed strings in dataframe") + + if cleaning_cfg.get('lowercase', False): + df = df.applymap(lambda x: x.lower() if isinstance(x, str) else x) + meta['lowercase'] = True + if logger: + logger.debug("Lowercased strings in dataframe") + + return df, meta + +# ------------------------------ +# Column Filtering +# ------------------------------ +def filter_columns(df, filter_cfg, logger=None): + meta = {} + if 'keep' in filter_cfg and filter_cfg['keep']: + df = df.loc[:, filter_cfg['keep']] + meta['columns_kept'] = filter_cfg['keep'] + if logger: + logger.info(f"Filtered columns, keeping only: {filter_cfg['keep']}") + elif 'drop' in filter_cfg and filter_cfg['drop']: + df = df.drop(columns=filter_cfg['drop'], errors='ignore') + meta['columns_dropped'] = filter_cfg['drop'] + if logger: + logger.info(f"Dropped columns: {filter_cfg['drop']}") + return df, meta + +# ------------------------------ +# Dtype Conversion +# ------------------------------ +def convert_dtypes(df, dtype_cfg, logger=None): + meta = {'dtype_conversions': {}} + for col, dtype in dtype_cfg.items(): + if col in df.columns: + try: + df[col] = df[col].astype(dtype) + meta['dtype_conversions'][col] = dtype + if logger: + logger.debug(f"Converted column '{col}' to dtype '{dtype}'") + except Exception as e: + warn_msg = f"Warning: could not convert column {col} to {dtype}: {e}" + if logger: + logger.warning(warn_msg) + else: + print(warn_msg) + return df, meta + +# ------------------------------ +# Remove Empty Columns +# ------------------------------ +def remove_empty_columns(df, logger=None): + empty_cols = [col for col in df.columns if df[col].isna().all()] + if empty_cols: + df = df.drop(columns=empty_cols) + if logger: + logger.info(f"Removed empty columns: {empty_cols}") + return df, {'empty_columns_removed': empty_cols} + +# ------------------------------ +# Add Row IDs +# ------------------------------ +def add_row_ids(df, logger=None): + df.insert(0, 'row_id', range(1, len(df) + 1)) + if logger: + logger.info("Added unique row_id column") + return df, {'row_id_added': True} + +# ------------------------------ +# Drop Duplicates +# ------------------------------ +def drop_duplicates(df, drop_cfg=True, logger=None): + meta = {'duplicates_removed': 0} + if drop_cfg: + before_rows = df.shape[0] + df = df.drop_duplicates() + after_rows = df.shape[0] + meta['duplicates_removed'] = before_rows - after_rows + if logger: + logger.info(f"Removed {meta['duplicates_removed']} duplicate rows") + return df, meta + +# ------------------------------ +# Rename Columns +# ------------------------------ +def rename_columns(df, rename_cfg, logger=None): + meta = {} + if rename_cfg: + df = df.rename(columns=rename_cfg) + meta['columns_renamed'] = rename_cfg + if logger: + logger.info(f"Renamed columns: {rename_cfg}") + return df, meta + +# ------------------------------ +# Main Preprocess +# ------------------------------ +def preprocess_tabular(df, cfg, logger=None): + metadata = [] + + def record_step(name, meta): + metadata.append({"step": name, **meta}) + + if cfg.get('remove_empty_columns', False): + df, meta = remove_empty_columns(df, logger=logger) + record_step('remove_empty_columns', meta) + + if cfg.get('add_row_id', False): + df, meta = add_row_ids(df, logger=logger) + record_step('add_row_id', meta) + + if cfg.get('drop_duplicates', False): + df, meta = drop_duplicates(df, logger=logger) + record_step('drop_duplicates', meta) + + if 'rename_columns' in cfg: + df, meta = rename_columns(df, cfg['rename_columns'], logger=logger) + record_step('rename_columns', meta) + + if 'missing_values' in cfg: + df, meta = handle_missing_values(df, cfg['missing_values'], logger=logger) + record_step('handle_missing_values', meta) + + if 'normalization' in cfg: + df, meta = normalize_columns(df, cfg['normalization'], logger=logger) + record_step('normalize_columns', meta) + + if 'categorical_encoding' in cfg: + df, meta = encode_categorical(df, cfg['categorical_encoding'], logger=logger) + record_step('encode_categorical', meta) + + if 'outlier_removal' in cfg: + df, meta = remove_outliers(df, cfg['outlier_removal'], logger=logger) + record_step('remove_outliers', meta) + + if 'cleaning' in cfg: + df, meta = clean_data(df, cfg['cleaning'], logger=logger) + record_step('clean_data', meta) + + if 'column_filtering' in cfg: + df, meta = filter_columns(df, cfg['column_filtering'], logger=logger) + record_step('filter_columns', meta) + + if 'dtype_conversion' in cfg: + df, meta = convert_dtypes(df, cfg['dtype_conversion'], logger=logger) + record_step('convert_dtypes', meta) + + return df, metadata + +# ------------------------------ +# Save Tabular Output +# ------------------------------ +def save_processed_tabular(df, output_folder, logger=None): + os.makedirs(os.path.dirname(output_folder), exist_ok=True) + df.to_csv(output_folder, index=False) + if logger: + logger.info(f"Saved processed tabular data to {output_folder}") diff --git a/File Upload Service/app/videos.py b/File Upload Service/app/videos.py new file mode 100644 index 0000000..cde5fc3 --- /dev/null +++ b/File Upload Service/app/videos.py @@ -0,0 +1,47 @@ +import os +import pandas as pd +from PIL import Image +import numpy as np +from datetime import datetime + +def save_processed_video_frames(frames, output_folder, logger=None): + os.makedirs(output_folder, exist_ok=True) + + metadata = [] + saved_count = 0 + for idx, frame in enumerate(frames): + # Convert float32 RGB array [0,1] back to uint8 [0,255] + frame_uint8 = (frame * 255).astype(np.uint8) + img = Image.fromarray(frame_uint8) + + filename = f'frame_{idx:04d}.png' + out_path = os.path.join(output_folder, filename) + try: + img.save(out_path) + saved_count += 1 + if logger: + logger.debug(f"Saved frame {idx} to {out_path}") + # Collect metadata + metadata.append({ + "filename": filename, + "width": img.width, + "height": img.height, + "timestamp": datetime.now().isoformat(), + "processed_path": out_path + }) + except Exception as e: + if logger: + logger.error(f"Error saving frame {idx}: {e}") + + if logger: + logger.info(f"Saved {saved_count} frames to folder: {output_folder}") + + # Save metadata CSV + metadata_df = pd.DataFrame(metadata) + metadata_csv_path = os.path.join(output_folder, "metadata.csv") + metadata_df.to_csv(metadata_csv_path, index=False) + + if logger: + logger.info(f"Saved video frames metadata to {metadata_csv_path}") + + return metadata diff --git a/README.md b/README.md index bf8ee23..582c0c9 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,23 @@ # redback-data-warehouse Data Warehouse storage of code and configurations + +## Garmin Run Data – ETL Pipeline Update + +This ETL pipeline processes `Garmin_run_data.csv` and includes: + +### Data cleaning: +- Removes duplicate rows +- Standardizes column names (lowercase, underscores) +- Converts timestamps to datetime +- Fills missing numeric values with column means +- Removes outliers in `heart_rate` (keeps values between 30–220 bpm) +- Converts distance from meters to kilometers +- Converts speed from m/s to km/h + +### Data aggregation: +- Groups data by year and week +- Calculates total runs, total distance (km), average speed (km/h), and average pace (min/km) per week + +### Outputs: +- `cleaned_garmin_run_data.csv` → cleaned dataset + diff --git a/Requirement Gathering (4).pdf b/Requirement Gathering (4).pdf new file mode 100644 index 0000000..aaa2831 Binary files /dev/null and b/Requirement Gathering (4).pdf differ diff --git a/etl_scripts/ETL pipeline.ipynb b/etl_scripts/ETL pipeline.ipynb new file mode 100644 index 0000000..cae4104 --- /dev/null +++ b/etl_scripts/ETL pipeline.ipynb @@ -0,0 +1,239 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "27156c5d-4dfb-4f95-a4e6-d88976d9c7c5", + "metadata": {}, + "source": [ + "# Importing required libraries\r\n", + "# We use pandas for data manipulation and matplotlib for visualization" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "ced94ba9-4fef-457e-9c36-855cc56c90bb", + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import matplotlib.pyplot as plt\n" + ] + }, + { + "cell_type": "markdown", + "id": "0501979d-4574-4efc-8409-43de021a8bb6", + "metadata": {}, + "source": [ + "# Extract – Load the Garmin running data\r\n", + "# Reading the original raw CSV file" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "5445f384-8e1a-4d1a-9936-49f6ae3cbacb", + "metadata": {}, + "outputs": [], + "source": [ + "df_raw = pd.read_csv(\"Garmin_run_data.csv\")" + ] + }, + { + "cell_type": "markdown", + "id": "6221e8e8-af73-4fbe-be56-d992d7f63132", + "metadata": {}, + "source": [ + "# Transform – Cleaning and enhancing the data" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "368a5cac-e8e5-444b-8dbe-9d96a7581f3d", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "✅ Cleaned data and weekly stats saved.\n" + ] + } + ], + "source": [ + "import pandas as pd\n", + "\n", + "# ============================\n", + "# 📥 Load raw data\n", + "# ============================\n", + "df_raw = pd.read_csv(\"Garmin_run_data.csv\")\n", + "\n", + "# ============================\n", + "# 🧹 Data Cleaning\n", + "# ============================\n", + "\n", + "# 1. Remove duplicate rows\n", + "df_cleaned = df_raw.drop_duplicates()\n", + "\n", + "# 2. Standardize column names (lowercase, underscores)\n", + "df_cleaned.columns = [col.strip().lower().replace(\" \", \"_\") for col in df_cleaned.columns]\n", + "\n", + "# 3. Convert timestamps to datetime\n", + "if 'timestamp' in df_cleaned.columns:\n", + " df_cleaned['timestamp'] = pd.to_datetime(df_cleaned['timestamp'], errors='coerce')\n", + "\n", + "# 4. Fill missing numeric values with column means\n", + "numeric_cols = df_cleaned.select_dtypes(include='number').columns\n", + "df_cleaned[numeric_cols] = df_cleaned[numeric_cols].fillna(df_cleaned[numeric_cols].mean())\n", + "\n", + "# 5. Remove outliers in heart_rate (keep values between 30 and 220 bpm)\n", + "if 'heart_rate' in df_cleaned.columns:\n", + " df_cleaned = df_cleaned[(df_cleaned['heart_rate'] >= 30) & (df_cleaned['heart_rate'] <= 220)]\n", + "\n", + "# 6. Unit conversion: meters to kilometers\n", + "if 'distance' in df_cleaned.columns:\n", + " df_cleaned['distance_km'] = df_cleaned['distance'] / 1000\n", + "\n", + "# 7. Unit conversion: speed from m/s to km/h\n", + "if 'speed' in df_cleaned.columns:\n", + " df_cleaned['speed_kmh'] = df_cleaned['speed'] * 3.6\n", + "\n", + "# ============================\n", + "# 📊 Data Aggregation (Weekly Stats)\n", + "# ============================\n", + "\n", + "if 'timestamp' in df_cleaned.columns:\n", + " # Extract week, month, year for grouping\n", + " df_cleaned['week'] = df_cleaned['timestamp'].dt.isocalendar().week\n", + " df_cleaned['month'] = df_cleaned['timestamp'].dt.month\n", + " df_cleaned['year'] = df_cleaned['timestamp'].dt.year\n", + "\n", + " # Group by year + week to compute stats\n", + " weekly_stats = df_cleaned.groupby(['year', 'week']).agg(\n", + " total_runs=('timestamp', 'count'),\n", + " total_distance_km=('distance_km', 'sum'),\n", + " average_speed_kmh=('speed_kmh', 'mean')\n", + " ).reset_index()\n", + "\n", + " # Calculate average pace (min/km) if speed exists\n", + " if 'average_speed_kmh' in weekly_stats.columns:\n", + " weekly_stats['average_pace_min_per_km'] = 60 / weekly_stats['average_speed_kmh']\n", + "\n", + "# ============================\n", + "# 💾 Save outputs\n", + "# ============================\n", + "\n", + "# Save cleaned data\n", + "df_cleaned.to_csv(\"cleaned_garmin_run_data.csv\", index=False)\n", + "\n", + "# Save weekly statistics (if generated)\n", + "if 'weekly_stats' in locals():\n", + " weekly_stats.to_csv(\"weekly_stats_garmin_run_data.csv\", index=False)\n", + "\n", + "print(\"✅ Cleaned data and weekly stats saved.\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "1eb98d8e-3551-4b80-962a-810059c40992", + "metadata": {}, + "source": [ + "# Visualize – Ploting distributions for insight" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "6425c501-52f4-44eb-b1a3-46c5ecec29dc", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# Plot heart rate distribution\n", + "if 'heart_rate' in df_cleaned.columns:\n", + " plt.figure(figsize=(8, 4))\n", + " plt.hist(df_cleaned['heart_rate'], bins=30, color='skyblue', edgecolor='black')\n", + " plt.title(\"Heart Rate Distribution\")\n", + " plt.xlabel(\"Heart Rate (bpm)\")\n", + " plt.ylabel(\"Frequency\")\n", + " plt.grid(True)\n", + " plt.show()\n", + "\n", + "# Plot distance (km) if available\n", + "if 'distance_km' in df_cleaned.columns:\n", + " plt.figure(figsize=(8, 4))\n", + " plt.hist(df_cleaned['distance_km'], bins=20, color='lightgreen', edgecolor='black')\n", + " plt.title(\"Distance Distribution (km)\")\n", + " plt.xlabel(\"Distance (km)\")\n", + " plt.ylabel(\"Frequency\")\n", + " plt.grid(True)\n", + " plt.show()\n" + ] + }, + { + "cell_type": "markdown", + "id": "f458eeb5-f7a3-4ed7-a260-0dc2c3bfb0d7", + "metadata": {}, + "source": [ + "# Load – Saved the cleaned dataset to a new CSV" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "4581ef6c-925f-4cc9-87f5-f31aaaaac393", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "✅ Cleaned data saved to 'cleaned_garmin_run_data.csv'\n" + ] + } + ], + "source": [ + "df_cleaned.to_csv(\"cleaned_garmin_run_data.csv\", index=False)\n", + "print(\"✅ Cleaned data saved to 'cleaned_garmin_run_data.csv'\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "98056b45-85a5-4848-9014-5854c922c2af", + "metadata": {}, + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}