diff --git a/README.md b/README.md index a804245..f978608 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,29 @@ -# Hacker News +# Hacker News' API data processing pipeline ## Quick Start +### First time Setup +First, run garage once from the docker compose to set up the access keys +```bash +docker compose up -d garage-webui +``` +Using garage UI available at **http://localhost:3909/** : +1. Set up an access key +2. create a bucket named "bronze" accessed by that same access key. +3. then update the following environment of kafka-connect-setup in the docker compose: + 1. AWS_ACCESS_KEY_ID + 2. AWS_SECRET_ACCESS_KEY + +With this setup, garage will keep the access keys in its dedicated metadata folder. +### Launch the project +From the root run the following command : ```bash docker-compose up ``` --- - +## Available UIs ### Garage UI Open in browser: **http://localhost:3909/** @@ -29,17 +44,17 @@ jupyter notebook explore_data.ipynb ## šŸ—ļø Architecture ``` -HN API → Kafka Producer → Kafka Topics - ↓ - ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” - │ BRONZE Layer │ ← Spark + Delta Lake - │ (Raw Data) │ • Kafka → Delta - ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ • ACID writes +HN API → Kafka Producer → Kafka ↓ ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” - │ SILVER Layer │ ← Spark + Delta Lake - │ (Clean Data) │ • HTML cleaning - ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ • Quality scoring + │ BRONZE Layer │ + │ (Raw Data) │---------| + ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ | ← Spark + Delta Lake + ↓ | • HTML cleaning + ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” | • aggregation + │ SILVER Layer │←--------| + │ (Clean Data) │ + ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ``` --- diff --git a/bronze/__init__.py b/bronze/__init__.py deleted file mode 100644 index 743182d..0000000 --- a/bronze/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -"""Bronze layer package - Raw data ingestion""" -from .spark_loader import SparkBronzeLoader diff --git a/bronze/main.py b/bronze/main.py deleted file mode 100644 index 7690699..0000000 --- a/bronze/main.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 -""" -Run Spark Bronze Loader -Loads data from Kafka to Bronze layer using Spark -""" - -import sys -import os - -# Add project root to path -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from spark_loader import SparkBronzeLoader -from processor_common.test import a - -def main(): - """Main execution function""" - print("=" * 60) - print("SPARK BRONZE LAYER LOADER") - print("=" * 60) - - # Initialize Bronze loader - loader = SparkBronzeLoader( - bronze_path="data/bronze", - kafka_servers="kafka:29092" - ) - - try: - # Load all data from Kafka - stories_count, comments_count = loader.load_all() - - print("\n" + "=" * 60) - print("SUCCESS!") - print("=" * 60) - print(f"Total stories loaded: {stories_count}") - print(f"Total comments loaded: {comments_count}") - - except Exception as e: - print(f"\nāŒ Error: {e}") - import traceback - traceback.print_exc() - return 1 - - finally: - # Stop Spark session - loader.stop() - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/bronze/spark_loader.py b/bronze/spark_loader.py deleted file mode 100644 index 0522e7c..0000000 --- a/bronze/spark_loader.py +++ /dev/null @@ -1,184 +0,0 @@ -""" -Bronze Layer - Spark Batch Loader -Reads from Kafka using Spark and saves to Parquet -""" - -import logging -from pyspark.sql import SparkSession -from pyspark.sql.functions import from_json, col, lit, current_timestamp -from pyspark.sql.types import ( - StructType, StructField, StringType, IntegerType, - BooleanType, ArrayType, LongType -) -import os -from delta import configure_spark_with_delta_pip - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class SparkBronzeLoader: - """Spark-based Bronze layer loader - ingests data from Kafka to Parquet""" - - def __init__(self, bronze_path: str = "data/bronze", kafka_servers: str = "kafka:29092"): - """ - Initialize Spark Bronze Loader - - Args: - bronze_path: Path to bronze data directory - kafka_servers: Kafka bootstrap servers - """ - self.bronze_path = bronze_path - self.kafka_servers = kafka_servers - self.spark = self._create_spark_session() - logger.info(f"Initialized SparkBronzeLoader (bronze_path={bronze_path})") - - def _create_spark_session(self) -> SparkSession: - """Create Spark session with Kafka and Delta Lake support""" - logger.info("Creating Spark session with Delta Lake...") - - # Configure Spark with Kafka and Delta Lake - # Note: We configure Delta first, then add Kafka package - builder = SparkSession.builder \ - .appName("HN Bronze Loader - Delta Lake") \ - .master("local[*]") \ - .config("spark.sql.adaptive.enabled", "true") \ - .config("spark.sql.shuffle.partitions", "4") \ - .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ - .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") - - # Configure Delta Lake, then add Kafka package - spark = configure_spark_with_delta_pip(builder, extra_packages=["org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0"]).getOrCreate() - - spark.sparkContext.setLogLevel("WARN") - logger.info("āœ“ Spark session created") - return spark - - def _get_hn_story_schema(self) -> StructType: - """Define schema for HN stories""" - return StructType([ - StructField("id", IntegerType(), True), - StructField("by", StringType(), True), - StructField("descendants", IntegerType(), True), - StructField("kids", ArrayType(IntegerType()), True), - StructField("score", IntegerType(), True), - StructField("time", LongType(), True), - StructField("title", StringType(), True), - StructField("type", StringType(), True), - StructField("url", StringType(), True), - StructField("text", StringType(), True) - ]) - - def _get_hn_comment_schema(self) -> StructType: - """Define schema for HN comments""" - return StructType([ - StructField("id", IntegerType(), True), - StructField("by", StringType(), True), - StructField("kids", ArrayType(IntegerType()), True), - StructField("parent", IntegerType(), True), - StructField("story_id", IntegerType(), True), - StructField("text", StringType(), True), - StructField("time", LongType(), True), - StructField("type", StringType(), True), - StructField("deleted", BooleanType(), True), - StructField("dead", BooleanType(), True) - ]) - - def load_from_kafka_batch(self, topic: str, schema: StructType, table_name: str): - """ - Load data from Kafka topic in batch mode using Spark - - Args: - topic: Kafka topic name - schema: Schema for the data - table_name: Name for output table (stories or comments) - """ - logger.info(f"Loading from Kafka topic: {topic}") - - # Read from Kafka in batch mode (read all available data) - df = self.spark \ - .read \ - .format("kafka") \ - .option("kafka.bootstrap.servers", self.kafka_servers) \ - .option("subscribe", topic) \ - .option("startingOffsets", "earliest") \ - .option("endingOffsets", "latest") \ - .load() - - logger.info(f"Read {df.count()} messages from Kafka") - - # Parse JSON value - df_parsed = df.select( - from_json(col("value").cast("string"), schema).alias("data"), - col("offset").alias("_kafka_offset"), - col("partition").alias("_kafka_partition") - ).select("data.*", "_kafka_offset", "_kafka_partition") - - # Add ingestion timestamp - df_bronze = df_parsed.withColumn("_bronze_ingested_at", current_timestamp()) - - # Save to Delta Lake - output_path = f"{self.bronze_path}/{table_name}" - os.makedirs(output_path, exist_ok=True) - - # Write to Delta Lake (append mode to preserve history) - df_bronze.write \ - .format("delta") \ - .mode("append") \ - .option("mergeSchema", "true") \ - .save(output_path) - - record_count = df_bronze.count() - logger.info(f"āœ“ Saved {record_count} records to Delta table: {output_path}") - logger.info(f" Columns: {df_bronze.columns}") - - return record_count - - def load_stories(self): - """Load stories from Kafka to Bronze layer using Spark""" - logger.info("\n" + "="*60) - logger.info("Loading STORIES with Spark") - logger.info("="*60) - - schema = self._get_hn_story_schema() - count = self.load_from_kafka_batch("hn-stories", schema, "stories") - return count - - def load_comments(self): - """Load comments from Kafka to Bronze layer using Spark""" - logger.info("\n" + "="*60) - logger.info("Loading COMMENTS with Spark") - logger.info("="*60) - - schema = self._get_hn_comment_schema() - count = self.load_from_kafka_batch("hn-comments", schema, "comments") - return count - - def load_all(self): - """ - Load all data from Kafka to Bronze layer using Spark - - Returns: - Tuple of (stories_count, comments_count) - """ - logger.info("Starting Spark Bronze Batch Loader") - - stories_count = self.load_stories() - comments_count = self.load_comments() - - logger.info("\n" + "="*60) - logger.info("Spark Bronze Batch Loader Complete!") - logger.info("="*60) - logger.info(f"Stories: {stories_count} records") - logger.info(f"Comments: {comments_count} records") - logger.info(f"\nData saved to:") - logger.info(f" - {self.bronze_path}/stories/") - logger.info(f" - {self.bronze_path}/comments/") - - return (stories_count, comments_count) - - def stop(self): - """Stop Spark session""" - if self.spark: - self.spark.stop() - logger.info("Spark session stopped") diff --git a/docker-compose.yml b/docker-compose.yml index 01e14eb..0dcb51d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,12 +31,19 @@ services: KAFKA_LOG_RETENTION_HOURS: 168 networks: - hackernews-network + healthcheck: + test: [ "CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092" ] + interval: 10s + timeout: 10s + retries: 10 + start_period: 40s kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui depends_on: - - kafka + kafka: + condition: service_healthy ports: - "8082:8080" environment: @@ -74,6 +81,8 @@ services: - SPARK_USER=spark ports: - '8081:8081' + depends_on: + - spark garage: image: grosinosky/garage:v2.1.0 @@ -90,8 +99,10 @@ services: - garage-data:/var/lib/garage/data - garage-meta:/var/lib/garage/meta restart: unless-stopped + networks: + - hackernews-network - webui: + garage-webui: image: khairul169/garage-webui:1.1.0 container_name: garage-webui restart: unless-stopped @@ -101,19 +112,121 @@ services: API_BASE_URL: "http://garage:3903" API_ADMIN_KEY: "changeme1234567890" S3_ENDPOINT_URL: "http://garage:3900" + depends_on: + - garage + networks: + - hackernews-network - bronze-processor: + kafka-init-job: build: - context: . - dockerfile: Dockerfile - container_name: bronze-processor - volumes: - - ./bronze:/code/ - - ./processor_common:/code/processor_common + context: ./ + dockerfile: utils/Dockerfile + container_name: kafka-job + depends_on: + kafka: + condition: service_healthy + kafka-connect-setup: + condition: service_completed_successfully + garage: + condition: service_started + networks: + - hackernews-network + + kafka-connect: + image: confluentinc/cp-kafka-connect:7.5.0 + hostname: kafka-connect + container_name: kafka-connect + depends_on: + kafka: + condition: service_healthy + garage: + condition: service_started + ports: + - "8083:8083" + environment: + CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092' + CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: kafka-connect-s3 + CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false" + CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" + CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" + CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" + AWS_ACCESS_KEY_ID: "GKfa508def1ce303cbcf7c716d" + AWS_SECRET_ACCESS_KEY: "b909152eeabfdc7ee998ea17d573fd7e3b1a436c80c9eca6d917753a5d4f18fd" + CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" + networks: + - hackernews-network + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8083/connectors"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 30s + command: + - bash + - -c + - | + confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.0 + /etc/confluent/docker/run + + kafka-connect-setup: + image: confluentinc/cp-kafka-connect:7.5.0 + container_name: kafka-connect-setup depends_on: - - kafka-init-job + kafka-connect: + condition: service_healthy networks: - hackernews-network + environment: + - CONNECT_URL=http://kafka-connect:8083 + - AWS_ACCESS_KEY_ID=GKff6ba2b3455ca4194ebe34c1 + - AWS_SECRET_ACCESS_KEY=81bd3b81dfe623301827b6fd5b9bea3fab1dea4aebac1e2793238655e37fe454 + - S3_BUCKET=bronze + - S3_ENDPOINT=http://garage:3900 + command: + - bash + - -c + - | + curl -X POST http://kafka-connect:8083/connectors -H "Content-Type: application/json" -d '{ + "name": "s3-sink-hn-stories", + "config": { + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "tasks.max": "1", + "topics": "hn-stories", + "s3.bucket.name": "bronze", + "s3.region": "garage", + "flush.size": "100", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "store.url": "http://garage:3900", + "s3.path.style.access.enabled": "true" + } + }' + curl -X POST http://kafka-connect:8083/connectors -H "Content-Type: application/json" -d '{ + "name": "s3-sink-hn-comments", + "config": { + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "tasks.max": "1", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "topics": "hn-comments", + "s3.bucket.name": "bronze", + "s3.region": "garage", + "flush.size": "100", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "store.url": "http://garage:3900", + "s3.path.style.access.enabled": "true" + } + }' + restart: "no" silver-processor: build: @@ -124,16 +237,28 @@ services: - ./silver:/code - ./processor_common:/code/processor_common depends_on: - - bronze-processor + kafka-init-job: + condition: service_completed_successfully + kafka-connect-setup: + condition: service_completed_successfully + garage: + condition: service_started + spark-worker: + condition: service_started + networks: + - hackernews-network - kafka-init-job: + gold-processor: build: - context: ./ - dockerfile: utils/Dockerfile - container_name: kafka-job + context: . + dockerfile: Dockerfile + container_name: gold-processor + volumes: + - ./gold:/code + - ./processor_common:/code/processor_common depends_on: - kafka: - condition: service_started + silver-processor: + condition: service_completed_successfully networks: - hackernews-network diff --git a/gold/main.py b/gold/main.py new file mode 100644 index 0000000..aee2c07 --- /dev/null +++ b/gold/main.py @@ -0,0 +1 @@ +print("Wello Horld! ;)") \ No newline at end of file