From a09bcdb9d4db87b21880efb27d393f24d3e50908 Mon Sep 17 00:00:00 2001 From: Shimron Rozario Date: Wed, 10 Sep 2025 17:27:05 +1000 Subject: [PATCH] Add provenance feature with MinIO integration --- File Upload Service/app/streamlitdw_fe_mt.py | 108 +++++++++++++++---- 1 file changed, 90 insertions(+), 18 deletions(-) diff --git a/File Upload Service/app/streamlitdw_fe_mt.py b/File Upload Service/app/streamlitdw_fe_mt.py index 0846ce4..349de4b 100644 --- a/File Upload Service/app/streamlitdw_fe_mt.py +++ b/File Upload Service/app/streamlitdw_fe_mt.py @@ -8,6 +8,9 @@ import datetime import subprocess import pandas as pd +import json +import hashlib +import re # Load environment variables load_dotenv() @@ -35,6 +38,15 @@ def validate_filename(name): return name.isalnum() +def is_valid_url(url: str) -> bool: + regex = re.compile( + r'^(https?://)?' + r'(([a-zA-Z0-9_-]+\.)+[a-zA-Z]{2,6})' + r'(/[^\s]*)?$', + re.IGNORECASE + ) + return bool(regex.match(url.strip())) + # Generate custom filename with suffix and prefix to enforce governance def generate_custom_filename(project, base_name, original_filename, add_prefix_suffix): file_extension = original_filename.split(".")[-1] @@ -45,7 +57,6 @@ def generate_custom_filename(project, base_name, original_filename, add_prefix_s custom_filename = f"{base_name}.{file_extension}" return custom_filename - def upload_to_minio(file, filename, bucket_name): try: data = file.read() @@ -55,6 +66,18 @@ def upload_to_minio(file, filename, bucket_name): except S3Error as e: st.error(f"Failed to upload {filename} to {bucket_name}: {e}") +def upload_provenance_to_minio(provenance, custom_name, bucket): + try: + prov_bytes = json.dumps(provenance, indent=4).encode("utf-8") + minio_client.put_object( + bucket, + f"{custom_name}.provenance.json", + io.BytesIO(prov_bytes), + len(prov_bytes), + ) + st.success(f"Provenance uploaded to MinIO: {custom_name}.provenance.json") + except S3Error as e: + st.error(f"Failed to upload provenance for {custom_name}: {e}") def trigger_etl(filename, preprocessing_option): """Trigger the ETL pipeline with the selected preprocessing option.""" @@ -72,7 +95,6 @@ def trigger_etl(filename, preprocessing_option): st.error(f"Failed to execute ETL pipeline for: {filename}") st.text(f"ETL Error Output: {e.stderr}") - def get_file_list(bucket): try: # Flask API to access the list of data from the VM @@ -109,9 +131,8 @@ def main(): if "uploaded_filenames" not in st.session_state: st.session_state.uploaded_filenames = [] - - # Create tabs for File Upload, Bronze, and Silver - tabs = st.tabs(["File Upload & ETL", "View Original Files", "View Pre-processed Files"]) + # Create tabs for File Upload, Bronze, Silver, Provenance + tabs = st.tabs(["File Upload & ETL", "View Original Files", "View Pre-processed Files", "View Provenance Logs"]) # Tab 1: File Upload & ETL with tabs[0]: @@ -121,6 +142,8 @@ def main(): num_files = st.number_input("Number of files to upload", 1, 10, 1) preprocessing = st.selectbox("Preprocessing (optional)", options=["No Pre-processing", "Data Clean Up", "Preprocessing for Machine Learning"]) add_prefix = st.checkbox("Add project as prefix and date as suffix to filename (to overwrite existing files)", value=True) + provenance_source = st.text_input("Provenance Source (e.g., Wikipedia, Internal DB, Kaggle)") + source_url = st.text_input("Source URL (optional)") uploaded_files = [] base_names = [] @@ -148,27 +171,57 @@ def main(): st.warning("Please select at least one file.") elif not valid_basenames: st.warning("Please fix invalid base names.") + elif provenance_source.strip() == "": + st.warning("Please enter a provenance source before uploading.") + elif source_url and not is_valid_url(source_url): + st.warning("The Source URL format appears to be invalid. Please enter a valid URL.") else: st.session_state.uploaded_filenames = [] for idx, file in enumerate(uploaded_files): custom_name = generate_custom_filename(project, base_names[idx], file.name, add_prefix) + + # Upload file to MinIO upload_to_minio(file, custom_name, bucket_name_bronze) st.session_state.uploaded_filenames.append(custom_name) + # Build provenance entry + new_entry = { + "upload_time": datetime.datetime.now().isoformat(), + "provenance_source": provenance_source, + "source_url": source_url, + "preprocessing": preprocessing, + "uploaded_by": os.getenv("USERNAME") or os.getenv("USER") or "unknown" + } + + # Add a digital signature + entry_str = json.dumps(new_entry, sort_keys=True) + new_entry["signature"] = hashlib.sha256(entry_str.encode("utf-8")).hexdigest() + + # Provenance structure + provenance = { + "filename": custom_name, + "original_filename": file.name, + "project": project, + "bucket": bucket_name_bronze, + "history": [new_entry] + } + + # Upload provenance JSON + upload_provenance_to_minio(provenance, custom_name, bucket_name_bronze) + # Option to trigger ETL after all uploads if st.session_state.uploaded_filenames: if st.button("Triggering ETL for All Uploaded Files"): for filename in st.session_state.uploaded_filenames: trigger_etl(filename, preprocessing) - - # Tab 2: View Bronze Files + + # Tab 2: View Bronze Files with tabs[1]: st.header("Uploaded Files Overview - Bronze (dw-bucket-bronze)") - # Get the list of files from the "dw-bucket-bronze" bucket files_by_project = get_file_list("dw-bucket-bronze") if files_by_project: - available_projects = list(files_by_project.keys()) # Get project names (folders) + available_projects = list(files_by_project.keys()) selected_project = st.selectbox("Select Project Folder", available_projects) if selected_project in files_by_project: @@ -176,7 +229,7 @@ def main(): if file_list: df = pd.DataFrame(file_list) - st.dataframe(df) # Display the table with the filtered list of files + st.dataframe(df) selected_file = st.selectbox("Select File to Download", df["File"].tolist()) @@ -184,32 +237,51 @@ def main(): file_content = download_file("dw-bucket-bronze", selected_project, selected_file) if file_content: st.download_button(label=f"Download {selected_file}", data=file_content, file_name=selected_file.split("/")[-1]) - - + # Tab 3: View Silver Files with tabs[2]: st.header("Uploaded Files Overview - Silver (dw-bucket-silver)") - # Get the list of files from the "dw-bucket-silver" bucket files_by_project = get_file_list("dw-bucket-silver") if files_by_project: - available_projects = list(files_by_project.keys()) # Get project names (folders) - selected_project = st.selectbox("Select Project Folder", available_projects) + available_projects = list(files_by_project.keys()) + selected_project = st.selectbox("Select Project Folder", available_projects, key="silver_project_select") if selected_project in files_by_project: file_list = [{"Project": selected_project, "File": file} for file in files_by_project[selected_project]] if file_list: df = pd.DataFrame(file_list) - st.dataframe(df) # Display the table with the filtered list of files + st.dataframe(df) - selected_file = st.selectbox("Select File to Download", df["File"].tolist()) + selected_file = st.selectbox("Select File to Download", df["File"].tolist(), key="silver_file_select") if st.button("Download Selected File from Silver"): file_content = download_file("dw-bucket-silver", selected_project, selected_file) if file_content: st.download_button(label=f"Download {selected_file}", data=file_content, file_name=selected_file.split("/")[-1]) - + + # Tab 4: View Provenance Logs + with tabs[3]: + st.header("Provenance Logs (MinIO)") + try: + objects = minio_client.list_objects(bucket_name_bronze, recursive=True) + provenance_files = [obj.object_name for obj in objects if obj.object_name.endswith(".provenance.json")] + except S3Error as e: + provenance_files = [] + st.error(f"Failed to list provenance files: {e}") + + if provenance_files: + selected_log = st.selectbox("Select a provenance log", provenance_files) + if selected_log: + try: + response = minio_client.get_object(bucket_name_bronze, selected_log) + data = json.load(response) + st.json(data) + except Exception as e: + st.error(f"Failed to load provenance log: {e}") + else: + st.info("No provenance logs found in MinIO.") if __name__ == "__main__": main()