Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions aircan/dags/api_ckan_import_to_bq_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@

import logging
import time
import json
import ast
from datetime import date, datetime

# Local imports
from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv

# Third-party library imports
from airflow import DAG
from airflow.exceptions import AirflowException

from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


args = {
'start_date': days_ago(0),
'params': {
"resource": {
"path": "path/to/my.csv",
"format": "CSV",
"ckan_resource_id": "res-id-123",
"schema": {
"fields": "['field1', 'field2']"
}
},
"ckan_config": {
"api_key": "API_KEY",
"site_url": "URL",
},
"big_query": {
"bq_project_id": "bigquery_project_id",
"bq_dataset_id": "bigquery_dataset_id"
},
"output_bucket": str(date.today())
}
}

dag = DAG(
dag_id='ckan_api_import_to_bq_v2',
default_args=args,
schedule_interval=None
)

def task_import_resource_to_bq(**context):
logging.info('Invoking import resource to bigquery')
logging.info("resource: {}".format(context['params'].get('resource', {})))

gc_file_url = context['params'].get('big_query', {}).get('gcs_uri')
bq_project_id = context['params'].get('big_query', {}).get('bq_project_id')
bq_dataset_id = context['params'].get('big_query', {}).get('bq_dataset_id')
bq_table_name = context['params'].get('big_query', {}).get('bq_table_name')
logging.info("bq_table_name: {}".format(bq_table_name))

raw_schema = context['params'].get('resource', {}).get('schema')
eval_schema = json.loads(raw_schema)
if isinstance(eval_schema, str):
eval_schema = ast.literal_eval(eval_schema)
schema = eval_schema.get('fields')
logging.info("SCHEMA: {}".format(schema))

# sample bq_table_id: "bigquerytest-271707.nhs_test.dag_test"
bq_table_id = '%s.%s.%s' % (bq_project_id, bq_dataset_id, bq_table_name)
logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id))
ckan_conf = context['params'].get('ckan_config', {})
ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id')
bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf)

import_resource_to_bq_task = PythonOperator(
task_id='import_resource_to_bq_v2',
provide_context=True,
python_callable=task_import_resource_to_bq,
dag=dag,
)
95 changes: 95 additions & 0 deletions aircan/dependencies/google_cloud/bigquery_handler_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from google.cloud import bigquery
import google.api_core.exceptions
from aircan.dependencies.utils import AirflowCKANException, aircan_status_update
import json
import logging

def replace_all(dict, string):
for key in dict:
string = string.replace(key, dict[key])
return string

def bq_import_csv(table_id, gcs_path, table_schema, ckan_conf):
try:
client = bigquery.Client()

job_config = bigquery.LoadJobConfig()

schema = bq_schema_from_table_schema(table_schema)
job_config.schema = schema

job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
# overwrite a Table
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
# set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub
# job_config.autodetect = True
load_job = client.load_table_from_uri(
gcs_path, table_id, job_config=job_config
)

load_job.result() # Waits for table load to complete.
destination_table = client.get_table(table_id)
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'progress',
'message': 'Data ingestion is in progress.'
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect status update timing.

The status update logic is incorrect - it sends "progress" status after the job has already completed successfully. This should be sent before starting the job.

+        # Update status before starting ingestion
+        status_dict = {
+            'res_id': ckan_conf.get('resource_id'),
+            'state': 'progress',
+            'message': 'Data ingestion is in progress.'
+        }
+        aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
+        
         load_job = client.load_table_from_uri(
             gcs_path, table_id, job_config=job_config
         )

         load_job.result()  # Waits for table load to complete.
         destination_table = client.get_table(table_id)
-        status_dict = {
-            'res_id': ckan_conf.get('resource_id'),
-            'state': 'progress',
-            'message': 'Data ingestion is in progress.'
-        }
-        aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'progress',
'message': 'Data ingestion is in progress.'
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
# Update status before starting ingestion
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'progress',
'message': 'Data ingestion is in progress.'
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
load_job = client.load_table_from_uri(
gcs_path, table_id, job_config=job_config
)
load_job.result() # Waits for table load to complete.
destination_table = client.get_table(table_id)
🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py around lines 33 to
38, the status update indicating "progress" is sent after the job completes,
which is incorrect. Move the status update call with 'state': 'progress' to
before the job starts to correctly reflect the ingestion progress status.

if destination_table:
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'complete',
'message': "Ingession Completed"
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
return {'success': True, 'message': 'BigQuery Table created successfully.'}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify redundant condition and fix typo.

The condition if destination_table: is always true since get_table() would raise an exception if the table doesn't exist. Also, there's a typo in the message.

-        if destination_table:
-            status_dict = {
-                'res_id': ckan_conf.get('resource_id'),
-                'state': 'complete',
-                'message': "Ingession Completed"
-            }
-            aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
-            return {'success': True, 'message': 'BigQuery Table created successfully.'}
+        # Update status to complete
+        status_dict = {
+            'res_id': ckan_conf.get('resource_id'),
+            'state': 'complete',
+            'message': "Ingestion Completed"
+        }
+        aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
+        return {'success': True, 'message': 'BigQuery Table created successfully.'}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if destination_table:
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'complete',
'message': "Ingession Completed"
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
return {'success': True, 'message': 'BigQuery Table created successfully.'}
# Update status to complete
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'complete',
'message': "Ingestion Completed"
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
return {'success': True, 'message': 'BigQuery Table created successfully.'}
🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py around lines 39 to
46, remove the redundant if condition checking destination_table since
get_table() raises an exception if the table doesn't exist, making the check
unnecessary. Also, fix the typo in the message from "Ingession Completed" to
"Ingestion Completed".

except Exception as e:
replacers = {
'gs://dx-nhs-staging-giftless/': '',
'gs://dx-nhs-production-giftless/': '',
'gs://dx-nhs-prod-giftless/': '',
'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
'datopian-dx': '',
'bigquery': '',
'googleapi': '',
'google': ''

}
Comment on lines +104 to +114
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make error sanitization configurable.

The hardcoded bucket names and project IDs make this code environment-specific and brittle. Consider making this configurable.

-        replacers = {
-            'gs://dx-nhs-staging-giftless/': '',
-            'gs://dx-nhs-production-giftless/': '',
-            'gs://dx-nhs-prod-giftless/': '',
-            'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
-            'datopian-dx': '',
-            'bigquery': '',
-            'googleapi': '',
-            'google': ''
-
-        }
+        # Configurable sanitization - could be passed as parameter or from config
+        replacers = ckan_conf.get('error_sanitization', {
+            'gs://dx-nhs-staging-giftless/': '',
+            'gs://dx-nhs-production-giftless/': '',
+            'gs://dx-nhs-prod-giftless/': '',
+            'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
+            'datopian-dx': '',
+            'bigquery': '',
+            'googleapi': '',
+            'google': ''
+        })

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py around lines 48 to
58, the error sanitization uses hardcoded bucket names and project IDs, making
it environment-specific and brittle. Refactor the code to accept these replacer
values from a configuration source such as environment variables or a config
file, allowing different environments to specify their own values without
changing the code.

e = replace_all(replacers,str(e))
logging.info(e)
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'failed',
'message': str(e)
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
raise AirflowCKANException('Data ingestion has failed.', str(e))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use proper exception chaining.

When re-raising exceptions within an except block, use raise ... from err to preserve the original exception context.

-        raise AirflowCKANException('Data ingestion has failed.', str(e))
+        raise AirflowCKANException('Data ingestion has failed.', str(e)) from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise AirflowCKANException('Data ingestion has failed.', str(e))
raise AirflowCKANException('Data ingestion has failed.', str(e)) from e
🧰 Tools
🪛 Ruff (0.12.2)

124-124: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py at line 124, the
exception is re-raised without proper chaining, losing the original error
context. Modify the raise statement to use "raise AirflowCKANException('Data
ingestion has failed.', str(e)) from e" to preserve the original exception
context for better debugging.



def bq_schema_from_table_schema(table_schema):
mapping = {
'string': 'STRING',
'number': 'NUMERIC',
'integer': 'NUMERIC',
'boolean': 'BOOLEAN',
'object': 'STRING',
'array': 'STRING',
'date': 'DATE',
'time': 'TIME',
'datetime': 'DATETIME',
'year': 'NUMERIC',
'yearmonth': 'STRING',
'duration': 'DATETIME',
'geopoint': 'GEOPOINT',
'geojson': 'STRING',
'any': 'STRING'
}

def _convert(field):
# Â TODO: support for e.g. required
return bigquery.SchemaField(field['name'],
mapping.get(field['type'], field['type']),
'NULLABLE'
)
return [_convert(field) for field in table_schema]
Loading