Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 67 additions & 6 deletions providers/google/docs/connections/gcp_sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ Schema (optional)
Login (required)
Specify the user name to connect.

Password (required)
Specify the password to connect.
Password (required unless IAM authentication is used)
Specify the password to connect. Leave it empty when using IAM authentication with either
``use_iam`` or ``sql_proxy_enable_iam_login``.

Extra (optional)
Specify the extra parameters (as JSON dictionary) that can be used in Google Cloud SQL
Expand Down Expand Up @@ -80,9 +81,16 @@ Extra (optional)
Configuring and using IAM authentication
----------------------------------------

The Google provider supports two IAM authentication paths:

* Direct IAM token authentication with ``use_iam``. Airflow generates a database login token and uses
it as the database password.
* Cloud SQL Auth Proxy IAM authentication with ``sql_proxy_enable_iam_login``. Airflow starts Cloud SQL
Auth Proxy with IAM database authentication enabled and connects with an empty password.

.. warning::
This functionality requires ``gcloud`` command (Google Cloud SDK) must be `installed
<https://cloud.google.com/sdk/docs/install>`_ on the Airflow worker.
Direct IAM token authentication with ``use_iam`` requires the ``gcloud`` command (Google Cloud SDK)
to be `installed <https://cloud.google.com/sdk/docs/install>`_ on the Airflow worker.

.. warning::
IAM authentication working only for Google Service Accounts.
Expand All @@ -101,15 +109,68 @@ Here are links describing what should be done before the start: `PostgreSQL
<https://cloud.google.com/sql/docs/postgres/iam-logins#before_you_begin>`_ and `MySQL
<https://cloud.google.com/sql/docs/mysql/iam-logins#before_you_begin>`_.

Configure ``gcpcloudsql`` connection with IAM enabling
""""""""""""""""""""""""""""""""""""""""""""""""""""""
Configure ``gcpcloudsql`` connection with direct IAM token authentication
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

For using IAM you need to enable ``"use_iam": "True"`` in the ``extra`` field. And specify IAM account in this format
``USERNAME@PROJECT_ID.iam.gserviceaccount.com`` in ``login`` field and empty string in the ``password`` field.
Do not combine ``use_iam`` with ``sql_proxy_enable_iam_login``.

For example:

.. exampleinclude:: /../../google/tests/system/google/cloud/cloud_sql/example_cloud_sql_query_iam.py
:language: python
:start-after: [START howto_operator_cloudsql_iam_connections]
:end-before: [END howto_operator_cloudsql_iam_connections]

Configure ``gcpcloudsql`` connection with Cloud SQL Auth Proxy IAM authentication
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

For using Cloud SQL Auth Proxy IAM authentication, enable ``"use_proxy": "True"`` and
``"sql_proxy_enable_iam_login": "True"`` in the ``extra`` field. With the current Cloud SQL Auth Proxy
v1 integration this option is supported for both Postgres and MySQL. Airflow passes
``-enable_iam_login`` to the proxy, so the ``password`` field can be empty.

Example "extras" field for Postgres:

.. code-block:: json

{
"database_type": "postgres",
"project_id": "example-project",
"location": "europe-west1",
"instance": "testinstance",
"use_proxy": true,
"sql_proxy_use_tcp": true,
"sql_proxy_enable_iam_login": true
}

Example "extras" field for MySQL:

.. code-block:: json

{
"database_type": "mysql",
"project_id": "example-project",
"location": "europe-west1",
"instance": "testinstance",
"use_proxy": true,
"sql_proxy_use_tcp": true,
"sql_proxy_enable_iam_login": true
}

.. note::
Cloud SQL for MySQL does not grant database-level privileges to IAM service-account users
automatically when the user is created. After creating the IAM service-account user (for example
via ``gcloud sql users create <user> --type=cloud_iam_service_account``) a database administrator
must grant the required privileges using SQL, for example
``GRANT SELECT ON <database>.* TO '<service-account-prefix>'@'%';``. This is a Cloud SQL operational
step and is outside the scope of Airflow. Cloud SQL for Postgres does not have this requirement
for the default ``public`` schema.

For example:

.. exampleinclude:: /../../google/tests/system/google/cloud/cloud_sql/example_cloud_sql_query_proxy_iam.py
:language: python
:start-after: [START howto_operator_cloudsql_proxy_iam_connections]
:end-before: [END howto_operator_cloudsql_proxy_iam_connections]
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ def __init__(
project_id: str = PROVIDE_PROJECT_ID,
sql_proxy_version: str | None = None,
sql_proxy_binary_path: str | None = None,
*,
sql_proxy_enable_iam_login: bool = False,
) -> None:
super().__init__()
self.path_prefix = path_prefix
Expand All @@ -540,6 +542,7 @@ def __init__(
self.instance_specification = instance_specification
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.sql_proxy_enable_iam_login = sql_proxy_enable_iam_login
self.command_line_parameters: list[str] = []
self.cloud_sql_proxy_socket_directory = self.path_prefix
self.sql_proxy_path = sql_proxy_binary_path or f"{self.path_prefix}_cloud_sql_proxy"
Expand All @@ -549,6 +552,8 @@ def __init__(
def _build_command_line_parameters(self) -> None:
self.command_line_parameters.extend(["-dir", self.cloud_sql_proxy_socket_directory])
self.command_line_parameters.extend(["-instances", self.instance_specification])
if self.sql_proxy_enable_iam_login:
self.command_line_parameters.append("-enable_iam_login")

@staticmethod
def _is_os_64bit() -> bool:
Expand Down Expand Up @@ -788,6 +793,9 @@ class CloudSQLDatabaseHook(BaseHook):
You cannot use proxy and SSL together.
* **use_iam** - (default False) Whether IAM should be used to connect to Cloud SQL DB.
With using IAM password field should be empty string.
* **sql_proxy_enable_iam_login** - (default False) Whether Cloud SQL Auth Proxy should use
IAM database authentication. This requires ``use_proxy`` and is supported with the current
Cloud SQL Auth Proxy v1 integration for both Postgres and MySQL.
* **sql_proxy_use_tcp** - (default False) If set to true, TCP is used to connect via
proxy, otherwise UNIX sockets are used.
* **sql_proxy_version** - Specific version of the proxy to download (for example
Expand Down Expand Up @@ -852,15 +860,12 @@ def __init__(
self.use_proxy = self._get_bool(self.extras.get("use_proxy", "False"))
self.use_ssl = self._get_bool(self.extras.get("use_ssl", "False"))
self.use_iam = self._get_bool(self.extras.get("use_iam", "False"))
self.sql_proxy_enable_iam_login = self._get_bool(
self.extras.get("sql_proxy_enable_iam_login", "False")
)
self.sql_proxy_use_tcp = self._get_bool(self.extras.get("sql_proxy_use_tcp", "False"))
self.sql_proxy_version = self.extras.get("sql_proxy_version")
self.sql_proxy_binary_path = sql_proxy_binary_path
if self.use_iam:
self.user = self._get_iam_db_login()
self.password = self._generate_login_token(service_account=self.cloudsql_connection.login)
else:
self.user = cast("str", self.cloudsql_connection.login)
self.password = cast("str", self.cloudsql_connection.password)
self.public_ip = self.cloudsql_connection.host
self.public_port = self.cloudsql_connection.port
self.ssl_cert = ssl_cert
Expand All @@ -876,7 +881,18 @@ def __init__(
# Generated based on clock + clock sequence. Unique per host (!).
# This is important as different hosts share the database
self.db_conn_id = str(uuid.uuid1())
# Validate before resolving user/password so invalid configs fail fast,
# without spawning the gcloud subprocess used by ``_generate_login_token``.
self._validate_inputs()
if self.use_iam:
self.user = self._get_iam_db_login()
self.password = self._generate_login_token(service_account=self.cloudsql_connection.login)
elif self.sql_proxy_enable_iam_login:
self.user = self._get_iam_db_login()
self.password = self.cloudsql_connection.password or ""
else:
self.user = cast("str", self.cloudsql_connection.login)
self.password = cast("str", self.cloudsql_connection.password)

@property
def sslcert(self) -> str | None:
Expand Down Expand Up @@ -989,6 +1005,12 @@ def _validate_inputs(self) -> None:
" SSL is not needed as Cloud SQL Proxy "
"provides encryption on its own"
)
if self.use_iam and self.sql_proxy_enable_iam_login:
raise ValueError(
"use_iam (direct IAM token) and sql_proxy_enable_iam_login (proxy IAM) are mutually exclusive"
)
if self.sql_proxy_enable_iam_login and not self.use_proxy:
raise ValueError("sql_proxy_enable_iam_login requires use_proxy to be True")
Comment thread
nailo2c marked this conversation as resolved.
if any([self.ssl_key, self.ssl_cert, self.ssl_root_cert]) and self.ssl_secret_id:
raise AirflowException(
"Invalid SSL settings. Please use either all of parameters ['ssl_cert', 'ssl_cert', "
Expand Down Expand Up @@ -1073,7 +1095,7 @@ def _generate_connection_uri(self) -> str:
raise AirflowException("The login parameter needs to be set in connection")
if not self.public_ip:
raise AirflowException("The host parameter needs to be set in connection")
if not self.password:
if not self.password and not self.sql_proxy_enable_iam_login:
raise AirflowException("The password parameter needs to be set in connection")
if not self.database:
raise AirflowException("The database parameter needs to be set in connection")
Expand Down Expand Up @@ -1136,7 +1158,7 @@ def _generate_connection_parameters(self) -> dict:
raise AirflowException("The login parameter needs to be set in connection")
if not self.public_ip:
raise AirflowException("The host parameter needs to be set in connection")
if not self.password:
if not self.password and not self.sql_proxy_enable_iam_login:
raise AirflowException("The password parameter needs to be set in connection")
if not self.database:
raise AirflowException("The database parameter needs to be set in connection")
Expand Down Expand Up @@ -1227,6 +1249,7 @@ def get_sqlproxy_runner(self) -> CloudSqlProxyRunner:
sql_proxy_version=self.sql_proxy_version,
sql_proxy_binary_path=self.sql_proxy_binary_path,
gcp_conn_id=self.gcp_conn_id,
sql_proxy_enable_iam_login=self.sql_proxy_enable_iam_login,
)

def get_database_hook(self, connection: Connection) -> DbApiHook:
Expand Down
Loading
Loading