-
Notifications
You must be signed in to change notification settings - Fork 0
feat: #1 - Jsonl Support #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| """ | ||
| Configuration constants for the application. | ||
|
|
||
| This module contains reusable constants used across the application, | ||
| particularly for file processing and data transformation operations. | ||
| """ | ||
|
|
||
| # Delimiter used when flattening nested JSON objects into flat column names | ||
| # Example: {"user": {"name": "John"}} becomes {"user__name": "John"} | ||
| NESTED_FIELD_DELIMITER = "__" | ||
|
|
||
| # Delimiter used when creating column names for array indices | ||
| # Example: {"tags": ["python", "api"]} becomes {"tags_0": "python", "tags_1": "api"} | ||
| ARRAY_INDEX_DELIMITER = "_" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,12 +3,13 @@ | |
| import sqlite3 | ||
| import io | ||
| import re | ||
| from typing import Dict, Any, List | ||
| from typing import Dict, Any, List, Set | ||
| from .sql_security import ( | ||
| execute_query_safely, | ||
| validate_identifier, | ||
| SQLSecurityError | ||
| ) | ||
| from .constants import NESTED_FIELD_DELIMITER, ARRAY_INDEX_DELIMITER | ||
|
|
||
| def sanitize_table_name(table_name: str) -> str: | ||
| """ | ||
|
|
@@ -171,4 +172,183 @@ def convert_json_to_sqlite(json_content: bytes, table_name: str) -> Dict[str, An | |
| } | ||
|
|
||
| except Exception as e: | ||
| raise Exception(f"Error converting JSON to SQLite: {str(e)}") | ||
| raise Exception(f"Error converting JSON to SQLite: {str(e)}") | ||
|
|
||
| def flatten_json_record(obj: Any, parent_key: str = "") -> Dict[str, Any]: | ||
| """ | ||
| Recursively flatten a nested JSON object into a flat dictionary. | ||
|
|
||
| - Nested dictionaries are flattened using NESTED_FIELD_DELIMITER (e.g., "user__name") | ||
| - Nested lists are flattened using ARRAY_INDEX_DELIMITER with index notation (e.g., "tags_0", "tags_1") | ||
| - Primitive values (strings, numbers, booleans, None) are kept as-is | ||
|
|
||
| Args: | ||
| obj: The object to flatten (dict, list, or primitive value) | ||
| parent_key: The parent key path (used for recursion) | ||
|
|
||
| Returns: | ||
| A flat dictionary with concatenated keys | ||
| """ | ||
| items = {} | ||
|
|
||
| if isinstance(obj, dict): | ||
| # Handle nested dictionaries | ||
| for key, value in obj.items(): | ||
| new_key = f"{parent_key}{NESTED_FIELD_DELIMITER}{key}" if parent_key else key | ||
| # Recursively flatten | ||
| flattened = flatten_json_record(value, new_key) | ||
| items.update(flattened) | ||
|
|
||
| elif isinstance(obj, list): | ||
| # Handle nested lists with index notation | ||
| for idx, item in enumerate(obj): | ||
| new_key = f"{parent_key}{ARRAY_INDEX_DELIMITER}{idx}" | ||
| # Recursively flatten each list item | ||
| flattened = flatten_json_record(item, new_key) | ||
| items.update(flattened) | ||
|
|
||
| else: | ||
| # Base case: primitive value (string, number, boolean, None) | ||
| items[parent_key] = obj | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Flattening top-level primitives creates empty-string column nameThe |
||
| return items | ||
|
|
||
| def discover_jsonl_schema(jsonl_content: bytes) -> Set[str]: | ||
| """ | ||
| Scan through entire JSONL file to discover all possible field names. | ||
| This handles schema evolution where different records may have different fields. | ||
|
|
||
| Args: | ||
| jsonl_content: The raw JSONL file content as bytes | ||
|
|
||
| Returns: | ||
| A set of all unique flattened field names found across all records | ||
|
|
||
| Raises: | ||
| ValueError: If no valid JSON records are found or if parsing fails | ||
| """ | ||
| all_fields = set() | ||
| lines = jsonl_content.decode('utf-8').strip().split('\n') | ||
| valid_records = 0 | ||
|
|
||
| for line_num, line in enumerate(lines, 1): | ||
| line = line.strip() | ||
| if not line: | ||
| continue | ||
|
|
||
| try: | ||
| record = json.loads(line) | ||
| flattened = flatten_json_record(record) | ||
| all_fields.update(flattened.keys()) | ||
| valid_records += 1 | ||
| except json.JSONDecodeError as e: | ||
| raise ValueError(f"Invalid JSON on line {line_num}: {str(e)}") | ||
|
|
||
| if valid_records == 0: | ||
| raise ValueError("No valid JSON records found in JSONL file") | ||
|
|
||
| return all_fields | ||
|
|
||
| def convert_jsonl_to_sqlite(jsonl_content: bytes, table_name: str) -> Dict[str, Any]: | ||
| """ | ||
| Convert JSONL (JSON Lines) file content to SQLite table. | ||
|
|
||
| JSONL files contain one JSON object per line. This function: | ||
| 1. Discovers all possible fields across all records (handles schema evolution) | ||
| 2. Flattens nested structures using configurable delimiters | ||
| 3. Creates a pandas DataFrame with all discovered columns | ||
| 4. Writes to SQLite database | ||
|
|
||
| Args: | ||
| jsonl_content: The raw JSONL file content as bytes | ||
| table_name: The desired name for the SQLite table | ||
|
|
||
| Returns: | ||
| Dictionary containing: | ||
| - table_name: The sanitized table name | ||
| - schema: Dictionary mapping column names to SQLite types | ||
| - row_count: Number of rows inserted | ||
| - sample_data: List of sample records (up to 5) | ||
|
|
||
| Raises: | ||
| Exception: If parsing or database operations fail | ||
| """ | ||
| try: | ||
| # Sanitize table name | ||
| table_name = sanitize_table_name(table_name) | ||
|
|
||
| # First pass: Discover all possible fields across all records | ||
| all_fields = discover_jsonl_schema(jsonl_content) | ||
|
|
||
| # Second pass: Parse and flatten all records | ||
| records = [] | ||
| lines = jsonl_content.decode('utf-8').strip().split('\n') | ||
|
|
||
| for line in lines: | ||
| line = line.strip() | ||
| if not line: | ||
| continue | ||
|
|
||
| record = json.loads(line) | ||
| flattened = flatten_json_record(record) | ||
|
|
||
| # Ensure all discovered fields are present (fill missing with None) | ||
| complete_record = {field: flattened.get(field) for field in all_fields} | ||
| records.append(complete_record) | ||
|
|
||
| if not records: | ||
| raise ValueError("No valid records found in JSONL file") | ||
|
|
||
| # Convert to pandas DataFrame | ||
| df = pd.DataFrame(records) | ||
|
|
||
| # Clean column names (lowercase, replace spaces/dashes with underscores) | ||
| df.columns = [col.lower().replace(' ', '_').replace('-', '_') for col in df.columns] | ||
|
|
||
| # Connect to SQLite database | ||
| conn = sqlite3.connect("db/database.db") | ||
|
|
||
| # Write DataFrame to SQLite | ||
| df.to_sql(table_name, conn, if_exists='replace', index=False) | ||
|
|
||
| # Get schema information using safe query execution | ||
| cursor_info = execute_query_safely( | ||
| conn, | ||
| "PRAGMA table_info({table})", | ||
| identifier_params={'table': table_name} | ||
| ) | ||
| columns_info = cursor_info.fetchall() | ||
|
|
||
| schema = {} | ||
| for col in columns_info: | ||
| schema[col[1]] = col[2] # column_name: data_type | ||
|
|
||
| # Get sample data using safe query execution | ||
| cursor_sample = execute_query_safely( | ||
| conn, | ||
| "SELECT * FROM {table} LIMIT 5", | ||
| identifier_params={'table': table_name} | ||
| ) | ||
| sample_rows = cursor_sample.fetchall() | ||
| column_names = [col[1] for col in columns_info] | ||
| sample_data = [dict(zip(column_names, row)) for row in sample_rows] | ||
|
|
||
| # Get row count using safe query execution | ||
| cursor_count = execute_query_safely( | ||
| conn, | ||
| "SELECT COUNT(*) FROM {table}", | ||
| identifier_params={'table': table_name} | ||
| ) | ||
| row_count = cursor_count.fetchone()[0] | ||
|
|
||
| conn.close() | ||
|
|
||
| return { | ||
| 'table_name': table_name, | ||
| 'schema': schema, | ||
| 'row_count': row_count, | ||
| 'sample_data': sample_data | ||
| } | ||
|
|
||
| except Exception as e: | ||
| raise Exception(f"Error converting JSONL to SQLite: {str(e)}") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Delimiter collision causes silent data loss during flattening
The
flatten_json_recordfunction usesitems.update()to merge flattened results, which silently overwrites values when key collisions occur. If a JSON record contains a field name that already includes the delimiter (like"user__name") alongside a nested structure that flattens to the same key (like{"user": {"name": "value"}}), the later value overwrites the earlier one without warning. Similarly, fields like"items_0"will collide with{"items": ["value"]}. This can cause silent data loss when processing JSONL files with field names containing__or_Npatterns.