31
31
)
32
32
33
33
from loguru import logger
34
+ import pydantic
34
35
from pydantic import BaseModel
35
36
import requests
36
37
37
38
from datacustomcode .cmd import cmd_output
38
- from datacustomcode .scan import scan_file
39
- from datacustomcode .version import get_version
40
39
41
40
if TYPE_CHECKING :
42
41
from datacustomcode .credentials import Credentials
@@ -79,8 +78,10 @@ def _make_api_call(
79
78
logger .debug (f"Request params: { kwargs } " )
80
79
81
80
response = requests .request (method = method , url = url , headers = headers , ** kwargs )
82
- response .raise_for_status ()
83
81
json_response = response .json ()
82
+ if response .status_code >= 400 :
83
+ logger .debug (f"Error Response: { json_response } " )
84
+ response .raise_for_status ()
84
85
assert isinstance (
85
86
json_response , dict
86
87
), f"Unexpected response type: { type (json_response )} "
@@ -225,89 +226,62 @@ def wait_for_deployment(
225
226
callback (status )
226
227
if status == "Deployed" :
227
228
logger .debug (
228
- "Deployment completed, Elapsed time: {time.time() - start_time}"
229
+ f "Deployment completed. \n Elapsed time: { time .time () - start_time } "
229
230
)
230
231
break
231
232
time .sleep (1 )
232
233
233
234
234
235
DATA_TRANSFORM_REQUEST_TEMPLATE : dict [str , Any ] = {
235
- "metadata" : {
236
- "dbt_schema_version" : "https://schemas.getdbt.com/dbt/manifest/v8.json" ,
237
- "dbt_version" : "1.4.6" ,
238
- "generated_at" : "2023-04-25T18:54:11.375589Z" ,
239
- "invocation_id" : "d6c68c69-533a-4d54-861e-1493d6cd8092" ,
240
- "env" : {},
241
- "project_id" : "jaffle_shop" ,
242
- "user_id" : "1ca8403c-a1a5-43af-8b88-9265e948b9d2" ,
243
- "send_anonymous_usage_stats" : True ,
244
- "adapter_type" : "spark" ,
245
- },
246
- "nodes" : {
247
- "model.dcexample.dim_listings_w_hosts" : {
248
- "name" : "dim_listings_w_hosts" ,
249
- "resource_type" : "model" ,
250
- "relation_name" : "{OUTPUT_DLO}" ,
251
- "config" : {"materialized" : "table" },
252
- "compiled_code" : "" ,
253
- "depends_on" : {"nodes" : []},
254
- }
255
- },
256
- "sources" : {
257
- "source.dcexample.listings" : {
258
- "name" : "listings" ,
259
- "resource_type" : "source" ,
260
- "relation_name" : "{INPUT_DLO}" ,
261
- "identifier" : "{INPUT_DLO}" ,
262
- }
263
- },
236
+ "nodes" : {},
237
+ "sources" : {},
264
238
"macros" : {
265
- "macro.dcexample.byoc" : {
266
- "name" : "byoc_example" ,
267
- "resource_type" : "macro" ,
268
- "path" : "" ,
269
- "original_file_path" : "" ,
270
- "unique_id" : "unique id" ,
271
- "macro_sql" : "" ,
272
- "supported_languages" : None ,
239
+ "macro.byoc" : {
273
240
"arguments" : [{"name" : "{SCRIPT_NAME}" , "type" : "BYOC_SCRIPT" }],
274
241
}
275
242
},
276
243
}
277
244
278
245
279
246
class DataTransformConfig (BaseModel ):
280
- input : Union [str , list [str ]]
281
- output : Union [str , list [str ]]
247
+ sdkVersion : str
248
+ entryPoint : str
249
+ dataspace : str
250
+ permissions : Permissions
282
251
283
252
284
- DATA_TRANSFORM_CONFIG_TEMPLATE : dict [str , Any ] = {
285
- "entryPoint" : "entrypoint.py" ,
286
- "dataspace" : "default" ,
287
- "permissions" : {"read" : {"dlo" : "" }, "write" : {"dlo" : "" }},
288
- "sdkVersion" : get_version (),
289
- }
253
+ class Permissions (BaseModel ):
254
+ read : Union [DloPermission ]
255
+ write : Union [DloPermission ]
290
256
291
257
292
- def get_data_transform_config (directory : str ) -> DataTransformConfig :
293
- """Get the data transform config from the entrypoint.py file."""
294
- entrypoint_file = os .path .join (directory , "entrypoint.py" )
295
- data_access_layer_calls = scan_file (entrypoint_file )
296
- input_ = data_access_layer_calls .input_str
297
- output = data_access_layer_calls .output_str
298
- return DataTransformConfig (input = input_ , output = output )
258
+ class DloPermission (BaseModel ):
259
+ dlo : list [str ]
299
260
300
261
301
- def create_data_transform_config (directory : str ) -> None :
302
- """Create a data transform config.json file in the directory."""
303
- data_transform_config = get_data_transform_config (directory )
304
- request_hydrated = DATA_TRANSFORM_CONFIG_TEMPLATE .copy ()
305
- request_hydrated ["permissions" ]["read" ]["dlo" ] = data_transform_config .input
306
- request_hydrated ["permissions" ]["write" ]["dlo" ] = data_transform_config .output
307
- logger .debug (f"Creating data transform config in { directory } " )
308
- json .dump (
309
- request_hydrated , open (os .path .join (directory , "config.json" ), "w" ), indent = 4
310
- )
262
+ def get_data_transform_config (directory : str ) -> DataTransformConfig :
263
+ """Get the data transform config from the config.json file."""
264
+ config_path = os .path .join (directory , "config.json" )
265
+ try :
266
+ with open (config_path , "r" ) as f :
267
+ config = json .loads (f .read ())
268
+ return DataTransformConfig (** config )
269
+ except FileNotFoundError as err :
270
+ raise FileNotFoundError (f"config.json not found at { config_path } " ) from err
271
+ except json .JSONDecodeError as err :
272
+ raise ValueError (f"config.json at { config_path } is not valid JSON" ) from err
273
+ except pydantic .ValidationError as err :
274
+ missing_fields = [str (err ["loc" ][0 ]) for err in err .errors ()]
275
+ raise ValueError (
276
+ f"config.json at { config_path } is missing required "
277
+ f"fields: { ', ' .join (missing_fields )} "
278
+ ) from err
279
+
280
+
281
+ def verify_data_transform_config (directory : str ) -> None :
282
+ """Verify the data transform config.json contents."""
283
+ get_data_transform_config (directory )
284
+ logger .debug (f"Verified data transform config in { directory } " )
311
285
312
286
313
287
def create_data_transform (
@@ -319,28 +293,31 @@ def create_data_transform(
319
293
script_name = metadata .name
320
294
data_transform_config = get_data_transform_config (directory )
321
295
request_hydrated = DATA_TRANSFORM_REQUEST_TEMPLATE .copy ()
322
- request_hydrated ["nodes" ]["model.dcexample.dim_listings_w_hosts" ][
323
- "relation_name"
324
- ] = data_transform_config .input
325
- request_hydrated ["sources" ]["source.dcexample.listings" ][
326
- "relation_name"
327
- ] = data_transform_config .output
328
- request_hydrated ["sources" ]["source.dcexample.listings" ][
329
- "identifier"
330
- ] = data_transform_config .output
331
- request_hydrated ["macros" ]["macro.dcexample.byoc" ]["arguments" ][0 ][
332
- "name"
333
- ] = script_name
296
+
297
+ # Add nodes for each write DLO
298
+ for i , dlo in enumerate (data_transform_config .permissions .write .dlo , 1 ):
299
+ request_hydrated ["nodes" ][f"node{ i } " ] = {
300
+ "relation_name" : dlo ,
301
+ "config" : {"materialized" : "table" },
302
+ "compiled_code" : "" ,
303
+ }
304
+
305
+ # Add sources for each read DLO
306
+ for i , dlo in enumerate (data_transform_config .permissions .read .dlo , 1 ):
307
+ request_hydrated ["sources" ][f"source{ i } " ] = {"relation_name" : dlo }
308
+
309
+ request_hydrated ["macros" ]["macro.byoc" ]["arguments" ][0 ]["name" ] = script_name
334
310
335
311
body = {
336
312
"definition" : {
337
- "type" : "DBT " ,
313
+ "type" : "DCSQL " ,
338
314
"manifest" : request_hydrated ,
339
315
"version" : "56.0" ,
340
316
},
341
317
"label" : f"{ metadata .name } " ,
342
318
"name" : f"{ metadata .name } " ,
343
319
"type" : "BATCH" ,
320
+ "dataSpaceName" : data_transform_config .dataspace ,
344
321
}
345
322
346
323
url = _join_strip_url (access_token .instance_url , DATA_TRANSFORMS_PATH )
@@ -359,7 +336,7 @@ def deploy_full(
359
336
360
337
# prepare payload
361
338
prepare_dependency_archive (directory )
362
- create_data_transform_config (directory )
339
+ verify_data_transform_config (directory )
363
340
364
341
# create deployment and upload payload
365
342
deployment = create_deployment (access_token , metadata )
0 commit comments