Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions aircan/dags/api_ckan_import_to_bq_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@

import logging
import time
import json
import ast
from datetime import date, datetime

# Local imports
from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv
from aircan.dependencies.utils import aircan_status_update_nhs as aircan_status_update

# Third-party library imports
from airflow import DAG
from airflow.exceptions import AirflowException

from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import traceback

args = {
'start_date': days_ago(0),
'params': {
"resource": {
"path": "path/to/my.csv",
"format": "CSV",
"ckan_resource_id": "res-id-123",
"schema": {
"fields": "['field1', 'field2']"
}
},
"ckan_config": {
"api_key": "API_KEY",
"site_url": "URL",
},
"big_query": {
"bq_project_id": "bigquery_project_id",
"bq_dataset_id": "bigquery_dataset_id"
},
"output_bucket": str(date.today())
}
}

dag = DAG(
dag_id='ckan_api_import_to_bq_v2',
default_args=args,
schedule_interval=None
)

def task_import_resource_to_bq(**context):
ckan_api_key = context['params'].get('ckan_config', {}).get('api_key')
ckan_site_url = context['params'].get('ckan_config', {}).get('site_url')
logging.info('Invoking import resource to bigquery')
logging.info("resource: {}".format(context['params'].get('resource', {})))

gc_file_url = context['params'].get('big_query', {}).get('gcs_uri')
bq_project_id = context['params'].get('big_query', {}).get('bq_project_id')
bq_dataset_id = context['params'].get('big_query', {}).get('bq_dataset_id')
bq_table_name = context['params'].get('big_query', {}).get('bq_table_name')
logging.info("bq_table_name: {}".format(bq_table_name))

raw_schema = context['params'].get('resource', {}).get('schema')
eval_schema = json.loads(raw_schema)
if isinstance(eval_schema, str):
eval_schema = ast.literal_eval(eval_schema)
schema = eval_schema.get('fields')
logging.info("SCHEMA: {}".format(schema))

# sample bq_table_id: "bigquerytest-271707.nhs_test.dag_test"
bq_table_id = '%s.%s.%s' % (bq_project_id, bq_dataset_id, bq_table_name)
logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id))
ckan_conf = context['params'].get('ckan_config', {})
ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id')
dag_run_id = context['run_id']
res_id = ckan_conf.get('resource_id')
ckan_conf['dag_run_id'] = dag_run_id
bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf)
status_dict = {
'dag_run_id': dag_run_id,
'resource_id': res_id,
'state': 'complete',
'message': 'Data ingestion completed successfully for "{res_id}".'.format(
res_id=res_id),
'clear_logs': True
}
aircan_status_update(ckan_site_url, ckan_api_key, status_dict)

import_resource_to_bq_task = PythonOperator(
task_id='import_resource_to_bq_v2',
provide_context=True,
python_callable=task_import_resource_to_bq,
dag=dag,
)
113 changes: 113 additions & 0 deletions aircan/dependencies/google_cloud/bigquery_handler_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from google.cloud import bigquery
import google.api_core.exceptions
from aircan.dependencies.utils import AirflowCKANException, aircan_status_update_nhs as aircan_status_update
import json
import logging

def replace_all(dict, string):
for key in dict:
string = string.replace(key, dict[key])
return string

def bq_import_csv(table_id, gcs_path, table_schema, ckan_conf):
try:
client = bigquery.Client()

try:
job_config = bigquery.LoadJobConfig()

schema = bq_schema_from_table_schema(table_schema)
job_config.schema = schema

job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
# overwrite a Table
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
# set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub
# job_config.autodetect = True
load_job = client.load_table_from_uri(
gcs_path, table_id, job_config=job_config
)

load_job.result() # Waits for table load to complete.
destination_table = client.get_table(table_id)
except Exception as e:
job_config = bigquery.LoadJobConfig()

job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
# overwrite a Table
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
# set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub
# job_config.autodetect = True
load_job = client.load_table_from_uri(
gcs_path, table_id, job_config=job_config
)
load_job.result() # Waits for table load to complete.
destination_table = client.get_table(table_id)
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'progress',
'message': 'Data ingestion is in progress.',
'dag_run_id': ckan_conf.get('dag_run_id')
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
if destination_table:
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'complete',
'message': "Ingession Completed",
'dag_run_id': ckan_conf.get('dag_run_id')
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
return {'success': True, 'message': 'BigQuery Table created successfully.'}
except Exception as e:
replacers = {
'gs://dx-nhs-staging-giftless/': '',
'gs://dx-nhs-production-giftless/': '',
'gs://dx-nhs-prod-giftless/': '',
'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
'datopian-dx': '',
'bigquery': '',
'googleapi': '',
'google': ''

}
Comment on lines +104 to +114
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make error sanitization configurable.

The hardcoded bucket names and project IDs make this code environment-specific and brittle. Consider making this configurable.

-        replacers = {
-            'gs://dx-nhs-staging-giftless/': '',
-            'gs://dx-nhs-production-giftless/': '',
-            'gs://dx-nhs-prod-giftless/': '',
-            'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
-            'datopian-dx': '',
-            'bigquery': '',
-            'googleapi': '',
-            'google': ''
-
-        }
+        # Configurable sanitization - could be passed as parameter or from config
+        replacers = ckan_conf.get('error_sanitization', {
+            'gs://dx-nhs-staging-giftless/': '',
+            'gs://dx-nhs-production-giftless/': '',
+            'gs://dx-nhs-prod-giftless/': '',
+            'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
+            'datopian-dx': '',
+            'bigquery': '',
+            'googleapi': '',
+            'google': ''
+        })

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py around lines 48 to
58, the error sanitization uses hardcoded bucket names and project IDs, making
it environment-specific and brittle. Refactor the code to accept these replacer
values from a configuration source such as environment variables or a config
file, allowing different environments to specify their own values without
changing the code.

e = replace_all(replacers,str(e))
logging.info(e)
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'dag_run_id': ckan_conf.get('dag_run_id'),
'state': 'failed',
'message': str(e)
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
raise AirflowCKANException('Data ingestion has failed.', str(e))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use proper exception chaining.

When re-raising exceptions within an except block, use raise ... from err to preserve the original exception context.

-        raise AirflowCKANException('Data ingestion has failed.', str(e))
+        raise AirflowCKANException('Data ingestion has failed.', str(e)) from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise AirflowCKANException('Data ingestion has failed.', str(e))
raise AirflowCKANException('Data ingestion has failed.', str(e)) from e
🧰 Tools
🪛 Ruff (0.12.2)

124-124: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py at line 124, the
exception is re-raised without proper chaining, losing the original error
context. Modify the raise statement to use "raise AirflowCKANException('Data
ingestion has failed.', str(e)) from e" to preserve the original exception
context for better debugging.



def bq_schema_from_table_schema(table_schema):
mapping = {
'string': 'STRING',
'number': 'NUMERIC',
'integer': 'NUMERIC',
'boolean': 'BOOLEAN',
'object': 'STRING',
'array': 'STRING',
'date': 'DATE',
'time': 'TIME',
'datetime': 'DATETIME',
'year': 'NUMERIC',
'yearmonth': 'STRING',
'duration': 'DATETIME',
'geopoint': 'GEOPOINT',
'geojson': 'STRING',
'any': 'STRING'
}

def _convert(field):
# Â TODO: support for e.g. required
return bigquery.SchemaField(field['name'],
mapping.get(field['type'], field['type']),
'NULLABLE'
)
return [_convert(field) for field in table_schema]
38 changes: 37 additions & 1 deletion aircan/dependencies/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,42 @@ def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
time(hour, minute, second, microsecond, tzinfo=timezone.TIMEZONE),
)

def aircan_status_update_nhs (site_url, ckan_api_key, status_dict):
"""
Update aircan run status like pending, error, process, complete
on ckan with message.
"""
logging.info('Updating data loading status')
try:
request_data = {
'dag_run_id': status_dict.get('dag_run_id', ''),
'resource_id': status_dict.get('res_id', ''),
'state': status_dict.get('state', ''),
'last_updated': str(datetime.utcnow()),
'message': status_dict.get('message', ''),
}

if status_dict.get('error', False):
request_data.update({'error': {
'message' : status_dict.get('error', '')
}})

url = urljoin(site_url, '/api/3/action/aircan_status_update')
response = requests.post(url,
data=json.dumps(request_data),
headers={'Content-Type': 'application/json',
'Authorization': ckan_api_key})
print(response.text)
if response.status_code == 200:
resource_json = response.json()
logging.info('Loading status updated successfully in CKAN.')
return {'success': True}
else:
print(response.json())
return response.json()
except Exception as e:
logging.error('Failed to update status in CKAN. {0}'.format(e))
Comment on lines +75 to +109
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix unused variable, indentation, and consistency issues.

The new function has several issues that need to be addressed:

  1. Unused variable - resource_json is assigned but never used
  2. Indentation issues - continuation lines are not properly indented
  3. Inconsistent logging - uses print() statements while the existing aircan_status_update function doesn't
  4. Missing return statement - exception handler doesn't return anything

Apply this diff to fix the issues:

def aircan_status_update_nhs (site_url, ckan_api_key, status_dict):
    """
    Update aircan run status like pending, error, process, complete 
    on ckan with message.
    """
    logging.info('Updating data loading status')
    try:
        request_data = { 
            'dag_run_id': status_dict.get('dag_run_id', ''),
            'resource_id': status_dict.get('res_id', ''),
            'state': status_dict.get('state', ''),
            'last_updated': str(datetime.utcnow()),
            'message': status_dict.get('message', ''),
        }

        if status_dict.get('error', False):
            request_data.update({'error': {
                'message' : status_dict.get('error', '')
            }})

        url = urljoin(site_url, '/api/3/action/aircan_status_update')
        response = requests.post(url,
-                        data=json.dumps(request_data),
-                        headers={'Content-Type': 'application/json',
-                                'Authorization': ckan_api_key})
-        print(response.text)
+                         data=json.dumps(request_data),
+                         headers={'Content-Type': 'application/json',
+                                  'Authorization': ckan_api_key})
        if response.status_code == 200:
-            resource_json = response.json()
            logging.info('Loading status updated successfully in CKAN.')
            return {'success': True}
        else:
-            print(response.json())
            return response.json()
    except Exception as e:
        logging.error('Failed to update status in CKAN. {0}'.format(e))
+        return {'success': False, 'error': str(e)}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def aircan_status_update_nhs (site_url, ckan_api_key, status_dict):
"""
Update aircan run status like pending, error, process, complete
on ckan with message.
"""
logging.info('Updating data loading status')
try:
request_data = {
'dag_run_id': status_dict.get('dag_run_id', ''),
'resource_id': status_dict.get('res_id', ''),
'state': status_dict.get('state', ''),
'last_updated': str(datetime.utcnow()),
'message': status_dict.get('message', ''),
}
if status_dict.get('error', False):
request_data.update({'error': {
'message' : status_dict.get('error', '')
}})
url = urljoin(site_url, '/api/3/action/aircan_status_update')
response = requests.post(url,
data=json.dumps(request_data),
headers={'Content-Type': 'application/json',
'Authorization': ckan_api_key})
print(response.text)
if response.status_code == 200:
resource_json = response.json()
logging.info('Loading status updated successfully in CKAN.')
return {'success': True}
else:
print(response.json())
return response.json()
except Exception as e:
logging.error('Failed to update status in CKAN. {0}'.format(e))
def aircan_status_update_nhs (site_url, ckan_api_key, status_dict):
"""
Update aircan run status like pending, error, process, complete
on ckan with message.
"""
logging.info('Updating data loading status')
try:
request_data = {
'dag_run_id': status_dict.get('dag_run_id', ''),
'resource_id': status_dict.get('res_id', ''),
'state': status_dict.get('state', ''),
'last_updated': str(datetime.utcnow()),
'message': status_dict.get('message', ''),
}
if status_dict.get('error', False):
request_data.update({'error': {
'message': status_dict.get('error', '')
}})
url = urljoin(site_url, '/api/3/action/aircan_status_update')
response = requests.post(
url,
data=json.dumps(request_data),
headers={
'Content-Type': 'application/json',
'Authorization': ckan_api_key
}
)
if response.status_code == 200:
logging.info('Loading status updated successfully in CKAN.')
return {'success': True}
else:
return response.json()
except Exception as e:
logging.error('Failed to update status in CKAN. {0}'.format(e))
return {'success': False, 'error': str(e)}
🧰 Tools
🪛 Ruff (0.12.2)

102-102: Local variable resource_json is assigned to but never used

Remove assignment to unused variable resource_json

(F841)

🪛 Flake8 (7.2.0)

[error] 97-97: continuation line under-indented for visual indent

(E128)


[error] 98-98: continuation line under-indented for visual indent

(E128)


[error] 99-99: continuation line under-indented for visual indent

(E128)


[error] 102-102: local variable 'resource_json' is assigned to but never used

(F841)

🤖 Prompt for AI Agents
In aircan/dependencies/utils.py from lines 75 to 109, fix the
aircan_status_update_nhs function by removing the unused variable resource_json,
replacing print statements with logging calls for consistency, correcting the
indentation of continuation lines to align properly, and adding a return
statement in the exception handler to return a failure indication or error
details.


def aircan_status_update(site_url, ckan_api_key, status_dict):
"""
Update aircan run status like pending, error, process, complete
Expand Down Expand Up @@ -325,4 +361,4 @@ def join_path(path, *paths):
"""
for p in paths:
path = os.path.join(path, p)
return path
return path
Loading