Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 90 additions & 18 deletions File Upload Service/app/streamlitdw_fe_mt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import datetime
import subprocess
import pandas as pd
import json
import hashlib
import re

# Load environment variables
load_dotenv()
Expand Down Expand Up @@ -35,6 +38,15 @@
def validate_filename(name):
return name.isalnum()

def is_valid_url(url: str) -> bool:
regex = re.compile(
r'^(https?://)?'
r'(([a-zA-Z0-9_-]+\.)+[a-zA-Z]{2,6})'
r'(/[^\s]*)?$',
re.IGNORECASE
)
return bool(regex.match(url.strip()))

# Generate custom filename with suffix and prefix to enforce governance
def generate_custom_filename(project, base_name, original_filename, add_prefix_suffix):
file_extension = original_filename.split(".")[-1]
Expand All @@ -45,7 +57,6 @@ def generate_custom_filename(project, base_name, original_filename, add_prefix_s
custom_filename = f"{base_name}.{file_extension}"
return custom_filename


def upload_to_minio(file, filename, bucket_name):
try:
data = file.read()
Expand All @@ -55,6 +66,18 @@ def upload_to_minio(file, filename, bucket_name):
except S3Error as e:
st.error(f"Failed to upload {filename} to {bucket_name}: {e}")

def upload_provenance_to_minio(provenance, custom_name, bucket):
try:
prov_bytes = json.dumps(provenance, indent=4).encode("utf-8")
minio_client.put_object(
bucket,
f"{custom_name}.provenance.json",
io.BytesIO(prov_bytes),
len(prov_bytes),
)
st.success(f"Provenance uploaded to MinIO: {custom_name}.provenance.json")
except S3Error as e:
st.error(f"Failed to upload provenance for {custom_name}: {e}")

def trigger_etl(filename, preprocessing_option):
"""Trigger the ETL pipeline with the selected preprocessing option."""
Expand All @@ -72,7 +95,6 @@ def trigger_etl(filename, preprocessing_option):
st.error(f"Failed to execute ETL pipeline for: {filename}")
st.text(f"ETL Error Output: {e.stderr}")


def get_file_list(bucket):
try:
# Flask API to access the list of data from the VM
Expand Down Expand Up @@ -109,9 +131,8 @@ def main():
if "uploaded_filenames" not in st.session_state:
st.session_state.uploaded_filenames = []


# Create tabs for File Upload, Bronze, and Silver
tabs = st.tabs(["File Upload & ETL", "View Original Files", "View Pre-processed Files"])
# Create tabs for File Upload, Bronze, Silver, Provenance
tabs = st.tabs(["File Upload & ETL", "View Original Files", "View Pre-processed Files", "View Provenance Logs"])

# Tab 1: File Upload & ETL
with tabs[0]:
Expand All @@ -121,6 +142,8 @@ def main():
num_files = st.number_input("Number of files to upload", 1, 10, 1)
preprocessing = st.selectbox("Preprocessing (optional)", options=["No Pre-processing", "Data Clean Up", "Preprocessing for Machine Learning"])
add_prefix = st.checkbox("Add project as prefix and date as suffix to filename (to overwrite existing files)", value=True)
provenance_source = st.text_input("Provenance Source (e.g., Wikipedia, Internal DB, Kaggle)")
source_url = st.text_input("Source URL (optional)")

uploaded_files = []
base_names = []
Expand Down Expand Up @@ -148,68 +171,117 @@ def main():
st.warning("Please select at least one file.")
elif not valid_basenames:
st.warning("Please fix invalid base names.")
elif provenance_source.strip() == "":
st.warning("Please enter a provenance source before uploading.")
elif source_url and not is_valid_url(source_url):
st.warning("The Source URL format appears to be invalid. Please enter a valid URL.")
else:
st.session_state.uploaded_filenames = []
for idx, file in enumerate(uploaded_files):
custom_name = generate_custom_filename(project, base_names[idx], file.name, add_prefix)

# Upload file to MinIO
upload_to_minio(file, custom_name, bucket_name_bronze)
st.session_state.uploaded_filenames.append(custom_name)

# Build provenance entry
new_entry = {
"upload_time": datetime.datetime.now().isoformat(),
"provenance_source": provenance_source,
"source_url": source_url,
"preprocessing": preprocessing,
"uploaded_by": os.getenv("USERNAME") or os.getenv("USER") or "unknown"
}

# Add a digital signature
entry_str = json.dumps(new_entry, sort_keys=True)
new_entry["signature"] = hashlib.sha256(entry_str.encode("utf-8")).hexdigest()

# Provenance structure
provenance = {
"filename": custom_name,
"original_filename": file.name,
"project": project,
"bucket": bucket_name_bronze,
"history": [new_entry]
}

# Upload provenance JSON
upload_provenance_to_minio(provenance, custom_name, bucket_name_bronze)

# Option to trigger ETL after all uploads
if st.session_state.uploaded_filenames:
if st.button("Triggering ETL for All Uploaded Files"):
for filename in st.session_state.uploaded_filenames:
trigger_etl(filename, preprocessing)
# Tab 2: View Bronze Files

# Tab 2: View Bronze Files
with tabs[1]:
st.header("Uploaded Files Overview - Bronze (dw-bucket-bronze)")
# Get the list of files from the "dw-bucket-bronze" bucket
files_by_project = get_file_list("dw-bucket-bronze")

if files_by_project:
available_projects = list(files_by_project.keys()) # Get project names (folders)
available_projects = list(files_by_project.keys())
selected_project = st.selectbox("Select Project Folder", available_projects)

if selected_project in files_by_project:
file_list = [{"Project": selected_project, "File": file} for file in files_by_project[selected_project]]

if file_list:
df = pd.DataFrame(file_list)
st.dataframe(df) # Display the table with the filtered list of files
st.dataframe(df)

selected_file = st.selectbox("Select File to Download", df["File"].tolist())

if st.button("Download Selected File from Bronze"):
file_content = download_file("dw-bucket-bronze", selected_project, selected_file)
if file_content:
st.download_button(label=f"Download {selected_file}", data=file_content, file_name=selected_file.split("/")[-1])



# Tab 3: View Silver Files
with tabs[2]:
st.header("Uploaded Files Overview - Silver (dw-bucket-silver)")
# Get the list of files from the "dw-bucket-silver" bucket
files_by_project = get_file_list("dw-bucket-silver")

if files_by_project:
available_projects = list(files_by_project.keys()) # Get project names (folders)
selected_project = st.selectbox("Select Project Folder", available_projects)
available_projects = list(files_by_project.keys())
selected_project = st.selectbox("Select Project Folder", available_projects, key="silver_project_select")

if selected_project in files_by_project:
file_list = [{"Project": selected_project, "File": file} for file in files_by_project[selected_project]]

if file_list:
df = pd.DataFrame(file_list)
st.dataframe(df) # Display the table with the filtered list of files
st.dataframe(df)

selected_file = st.selectbox("Select File to Download", df["File"].tolist())
selected_file = st.selectbox("Select File to Download", df["File"].tolist(), key="silver_file_select")

if st.button("Download Selected File from Silver"):
file_content = download_file("dw-bucket-silver", selected_project, selected_file)
if file_content:
st.download_button(label=f"Download {selected_file}", data=file_content, file_name=selected_file.split("/")[-1])


# Tab 4: View Provenance Logs
with tabs[3]:
st.header("Provenance Logs (MinIO)")
try:
objects = minio_client.list_objects(bucket_name_bronze, recursive=True)
provenance_files = [obj.object_name for obj in objects if obj.object_name.endswith(".provenance.json")]
except S3Error as e:
provenance_files = []
st.error(f"Failed to list provenance files: {e}")

if provenance_files:
selected_log = st.selectbox("Select a provenance log", provenance_files)
if selected_log:
try:
response = minio_client.get_object(bucket_name_bronze, selected_log)
data = json.load(response)
st.json(data)
except Exception as e:
st.error(f"Failed to load provenance log: {e}")
else:
st.info("No provenance logs found in MinIO.")

if __name__ == "__main__":
main()