Skip to content

Commit

Permalink
Check if task_key provided is empty string or exceeds 100 characters
Browse files Browse the repository at this point in the history
  • Loading branch information
hardeybisey committed Dec 19, 2024
1 parent 57f227e commit a16df9a
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ def __init__(
self,
caller: str = "DatabricksTaskBaseOperator",
databricks_conn_id: str = "databricks_default",
databricks_task_key: str | None = None,
databricks_task_key: str = "",
databricks_retry_args: dict[Any, Any] | None = None,
databricks_retry_delay: int = 1,
databricks_retry_limit: int = 3,
Expand Down Expand Up @@ -1046,14 +1046,14 @@ def databricks_task_key(self) -> str:

def _generate_databricks_task_key(self, task_id: str | None = None) -> str:
"""Create a databricks task key using the hash of dag_id and task_id."""
if self._databricks_task_key is None:
if not self._databricks_task_key or len(self._databricks_task_key) > 100:
self.log.info(
"No databricks_task_key provided. Generating task key using the hash value of dag_id+task_id."
"databricks_task_key has not be provided or the provided one exceeds 100 characters and will be truncated by the Databricks API. This will cause failure when trying to monitor the task. A task_key will be generated using the hash value of dag_id+task_id"
)
task_id = task_id or self.task_id
task_key = f"{self.dag_id}__{task_id}".encode()
self._databricks_task_key = hashlib.md5(task_key).hexdigest()
self.log.info("Generated databricks task key: %s", self._databricks_task_key)
self.log.info("Generated databricks task_key: %s", self._databricks_task_key)
return self._databricks_task_key

@property
Expand Down

0 comments on commit a16df9a

Please sign in to comment.