-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce notion of dialects in DbApiHook #41327
Conversation
@eladkal and @potiuk what do you think about this implementation? I would ideally want to use entry points to register the dialects, so that additional dialects can be loaded through different providers, but couldn't find such mechanism in the Airflow code base, apparently, according to ChatGPT, it is possible to do it either via a setup.py or via pyproject.toml. So if you have any suggestions on that case, that would be helpful, unless you guys have other ideas or suggestions or don't like the implementation at all of course. Atm the dialects (for the moment there is only MSSQL) are registered in a hard-coded way in the common-sql provider to make it work, it would be nice if those would be dynamically registered from the dedicated providers. Bellow the answer from ChatGPT:
|
We actually already use entrypoints - the You can also see decription of it in https://airflow.apache.org/docs/apache-airflow-providers/howto/create-custom-providers.html#custom-provider-packages Generally speaking you will have two do few things:
Once you do it all, the Another benefit of doing it this way is that "ProvidersManager" will see if provider is available directly in airflow sources and when you run |
Thank you @potiuk for the explanation, will have a look at it and see how to implement it for common sql provider. Another question, at the moment the MsSqlDialect class is also located within the common sql provider just so that it works but where would you put it?
|
I think in mssql provider, we do have a few "loosely related" things in providers already and cross-provider dependencies (both explicit and implicit) and sometimes where things are "between" we make arbitrary decisions. When we introduced providers we implement a bit "soft and not 100% compilable" rule on where to put such things and the rule is .... "Where it would be likely maintained best by the major stakeholder of the provider - assuming there is one" For example when we have S3 -> GCS transfer operator we put it in Google Provider and GCS-> S3 we put it in Amazon provider. The very "soft" and "political" reason for that is that Amazon will be keen on bringing data from GCS and migrating people over from Google and Google will be keen on bringing the data from S3 to GCS. Not very scientific, I know and sometimes you can argue the decision, but it's best "rule of thumb" we can have on that. So Assuming that Microsoft (which is not happening BTW) is the key stakeholder in So |
Thank you @jarek for your fast and elaborate answer, what you are saying makes sense and is indeed logical. Will try to implement those changes as quickly as possible. Thanks again :) |
I'm getting following error, I know what it means but don't understand what it causes:
|
… Microsoft mssql provider
… a single element
This is really nice- finally had a chance to review it. @dabla - can you rebase it pleaase :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM still in 2024 :-( [CET time zone at least]
Well. We have the saying ... what happens for you on New Years Eve - you will do the whole year :) Happy New Year |
@dabla it would be great to add docs to explain what dialects are, what problem it solves and how to use it. Most of this information is already mentioned in the PR so it just need to be added to the common.sql docs |
Good point! |
Yeah will create a new PR for that, also need to explain the new Airlow config parameter (disabled by default) which automatically fills the target_fields if not specified. Could one of you guys also check my question in the other PR regarding the GenericTransfer operator? |
Can you rebase it and mention us there :) ? Then it will be bumped on top of my list :) |
Yup just did it now thx |
* refactor: Added unit test for handlers module in mssql * refactor: Added unit test for Dialect class * refactor: Reformatted unit test of Dialect class * fix: Added missing import of TYPE_CHECKING * refactor: Added dialects in provider schema and moved MsSqlDialect to Microsoft mssql provider * refactor: Removed duplicate handlers and import them from handlers module * refactor: Fixed import of TYPE_CHECKING in mssql hook * refactor: Fixed some static checks and imports * refactor: Dialect should be defined as an array in provider.yaml, not a single element * refactor: Fixed default dialect name for common sql provider * refactor: Fixed dialect name for Microsoft MSSQL provider * refactor: Fixed module for dialect in pyton-modules of common sql provider * refactor: Dialect module is not part of hooks * refactor: Moved unit tests for default Dialect to common sql provider instead of Microsoft MSSQL provider * refactor: Added unit test for MsSqlDialect * refactor: Reformatted TestMsSqlDialect * refactor: Implemented dialect resolution using the ProvidersManagers in DbApiHook * refactor: Updated comment in dialects property * refactor: Added dialects lists command * refactor: Removed unused code from _import_hook method * refactor: Reformatted _discover_provider_dialects method in ProvidersManager * refactor: Removed unused imports from MsSqlHook * refactor: Removed dialects from DbApiHook definition * refactor: Reformatted _discover_provider_dialects * refactor: Renamed module for TestMsSqlDialect * refactor: test_generate_replace_sql in TestMsSqlHook should only be tested on Airflow 2.10 or higher * refactor: Updated expected merge into statement * refactor: Only run test_generate_replace_sql on TestMsSqlDialect when Airflow is higher than 2.10 * refactor: generate_insert_sql based on dialects should only be tested on Airflow 3.0 or higher * refactor: Updated reason in skipped tests * refactor: Removed locking in merge into * refactor: Added kwargs to constructor of Dialect to make it future proof if additional arguments would be needed in the future * refactor: Removed row locking clause in generated replace sql statement and removed pyi file for mssql dialect * refactor: Implemented PostgresDialect * fix: Fixed constructor Dialect * refactor: Register PostgresDialect in providers.yaml and added unit test for PostgresDialect * refactor: PostgresHook now uses the dialect to generate statements and get primary_keys * refactor: Refactored DbApiHook * refactor: Refactored the dialect_name mechanism in DbApiHook, override it in specialized Hooks * refactor: Fixed some static checks * refactor: Fixed dialect.pyi * refactor: Refactored how dialects are resolved, if not found always fall back to default * refactor: Reformatted dialect method in DbApiHook * refactor: Changed message in raised exception of dialect method when not found * refactor: Added missing get_records method in Dialect definition * refactor: Fixed some static checks and mypy issues * refactor: Raise ValueError if replace_index doesn't exist * refactor: Increased version of apache-airflow-providers-common-sql to 1.17.1 for mssql and postgres * refactor: Updated dialect.pyi * refactor: Updated provider dependencies * refactor: Incremented version of apache-airflow-providers-common-sql in test_get_install_requirements * refactor: Reformatted get_records method * refactor: Common sql provider must depend on latest Airflow version to be able to discover dialects through ProvidersManager * refactor: Updated provider dependencies * Revert "refactor: Updated provider dependencies" This reverts commit 2b591f2. * Revert "refactor: Common sql provider must depend on latest Airflow version to be able to discover dialects through ProvidersManager" This reverts commit cb2d043. * refactor: Added get_dialects method in DbAPiHook which contains fallback code for Airflow 2.8.x so the provider can still be used with Airflow versions prior to 3.0.0 * fix: get_dialects isn't a property but a method * refactor: Refactored get_dialects in DbAPiHook and added unit tests * refactor: Added unit tests for MsSqlHook related to dialects * refactor: Added unit tests for PostgresHook related to dialects * refactor: Fixed some static checks * refactor: Removed get_dialects method as this wasn't backward compatible, avoid importing DialectInfo until min required airflow version is 3.0.0 or higher * refactor: Re-added missing deprecated methods for backward compatibility * refactor: Added resolve_dialects method in sql.pyi * refactor: Reorganized imports in sql module * refactor: Fixed definition of resolve_dialects in sql.pyi * refactor: Fixed TestDialect * refactor: Fixed DbAPi tests and moved tests from DbAPi to Odbc * refactor: Ignore flake8 F811 error as those redefinitions are there for backward compatibility * refactor: Move import of Dialect under TYPE_CHECKING block * refactor: Fixed TestMsSqlDialect * refactor: Fixed TestPostgresDialect * refactor: Reformatted MsSqlHook * refactor: Added docstring on placeholder property * refactor: If no dialect is found for given dialect name, then return default Dialect * refactor: Try ignoring flake8 F811 error as those redefinitions are there for backward compatibility * refactor: Moved Dialect out of TYPE_CHECKING block * fix: Fixed definition location of dialect in dialect.pyi * fix: Fixed TestTeradataHook * refactor: Marked test_when_provider_min_airflow_version_is_3_0_or_higher_remove_obsolete_code as db test * refactor: Removed handler definitions from sql.pyi * Revert "refactor: Removed handler definitions from sql.pyi" This reverts commit a93d73c. * refactor: Removed white line * refactor: Removed duplicate imports if handlers * refactor: Fixed some static checks * refactor: Changed logging level of generated sql statement to INFO in DbApiHook * Revert "refactor: Changed logging level of generated sql statement to INFO in DbApiHook" This reverts commit c30feaf. * fix: Moved dialects to correct providers location * fix: Deleted old providers location * fix: Re-added missing dialects for mssql and postgres * fix: Fixed 2 imports for providers tests * refactored: Reorganized some imports * refactored: Fixed dialect and sql types * refactored: Fixed import of test_utils in test_dag_run * refactored: Added white line in imports of test_dag_run * refactored: Escape reserved words as column names * refactored: Fixed initialisation of Dialects * refactored: Renamed escape_reserved_word to escape_column_name * refactored: Reformatted TestMsSqlDialect * refactored: Fixed constructor definition Dialect * refactored: Fixed TestDbApiHook * refactored: Removed get_reserved_words from dialect definition * refactored: Added logging in get_reserved_words method * refactor: Removed duplicate reserved_words property in DbApiHook * refactor: Fixed invocation of reserved_words property and changed name of postgres dialect to postgresql like in sqlalchemy * refactor: Removed override generate_insert_sql method in PostgresDialect as it doesn't do anything different than the existing one in Dialect * refactor: Added unit tests for _generate_insert_sql methods on MsSqlHook and PostgresHook * refactor: Reformatted test mssql and test postgres * refactor: Fixed TestPostgresDialect * refactor: Refactored get_reserved_words * refactor: Added escape column name format so we can customize it if needed * refactor: Suppress NoSuchModuleError exception when trying to load dialect from sqlalchemy to get reserved words * refactor: Removed name from Dialect and added unit test for dialect name in JdbcHook * refactor: Fixed parameters in get_column_names method of Dialect * refactor: Added missing optional schema parameter to get_primary_keys method of MsSqlDialect * refactor: Fixed TestDialect * refactor: Fixed TestDbApiHook * refactor: Fixed TestMsSqlDialect * refactor: Reformatted test_generate_replace_sql * refactor: Fixed dialect in MsSqlHook and PostgresHook * refactor: Fixed TestPostgresDialect * refactor: Mark TestMySqlHook as a db test * refactor: Fixed test_generate_insert_sql_with_already_escaped_column_name in TestPostgresHook * refactor: Reactivate postgres backend in TestPostgresHook * refactor: Removed name param of constructor in Dialect definition * refactor: Reformatted imports for TestMySqlHook * refactor: Fixed import of get_provider_min_airflow_version in test sql * refactor: Override default escape_column_name_format for MySqlHook * refactor: Fixed tests in TestMySqlHook * refactor: Refactored INSERT_SQL_STATEMENT constant in TestMySqlHook * refactor: When using ODBC, we should also use the odbc connection when creating an sqlalchemy engine * refactor: Added get_target_fields in Dialect which only returns insertable column_names and added core.dbapihook_resolve_target_fields configuration parameter to allow to specify if we want to resolve target_fields automatically or not * refactor: By default the core.dbapihook_resolve_target_fields configuration parameter should be False so the original behaviour is respected * refactor: Added logging statement for target_fields in Dialect * refactor: Moved _resolve_target_fields as static field of DbApiHook and fixed TestMsSqlHook * refactor: Added test for get_sqlalchemy_engine in OdbcHook * refactor: Reformatted teardown method * Revert "refactor: Added test for get_sqlalchemy_engine in OdbcHook" This reverts commit 871e96b. * refactor: Remove patched get_sql_alchemy method in OdbcHook, will fix this in dedicated PR * refactor: Removed quotes from schema and table_name before invoking sqlalchemy inspector methods * refactor: Removed check in test_sql for Airflow 2.8 plus as it is already at that min required version * refactor: Fixed get_primary_keys method in PostgresDialect * refactor: Reformatted get_primary_keys method of PostgresDialect * refactor: extract_schema_from_table is now a public classmethod of Dialect * fix: extract_schema_from_table is now a public classmethod of Dialect * refactor: Reorganized imports * refactor: Reorganized imports dialect and postgres * refactor: Fixed test_dialect in TestProviderManager * refactor: Removed operators section from provider.yaml in mssql and postgres * refactor: Removed unused imports in postgres hook * refactor: Added missing import for AirflowProviderDeprecationWarning * refactor: Added rowlock option in merge into statement for MSSQL * refactor: Updated expected replace statement for MSSQL --------- Co-authored-by: David Blain <[email protected]> Co-authored-by: David Blain <[email protected]>
* refactor: Added unit test for handlers module in mssql * refactor: Added unit test for Dialect class * refactor: Reformatted unit test of Dialect class * fix: Added missing import of TYPE_CHECKING * refactor: Added dialects in provider schema and moved MsSqlDialect to Microsoft mssql provider * refactor: Removed duplicate handlers and import them from handlers module * refactor: Fixed import of TYPE_CHECKING in mssql hook * refactor: Fixed some static checks and imports * refactor: Dialect should be defined as an array in provider.yaml, not a single element * refactor: Fixed default dialect name for common sql provider * refactor: Fixed dialect name for Microsoft MSSQL provider * refactor: Fixed module for dialect in pyton-modules of common sql provider * refactor: Dialect module is not part of hooks * refactor: Moved unit tests for default Dialect to common sql provider instead of Microsoft MSSQL provider * refactor: Added unit test for MsSqlDialect * refactor: Reformatted TestMsSqlDialect * refactor: Implemented dialect resolution using the ProvidersManagers in DbApiHook * refactor: Updated comment in dialects property * refactor: Added dialects lists command * refactor: Removed unused code from _import_hook method * refactor: Reformatted _discover_provider_dialects method in ProvidersManager * refactor: Removed unused imports from MsSqlHook * refactor: Removed dialects from DbApiHook definition * refactor: Reformatted _discover_provider_dialects * refactor: Renamed module for TestMsSqlDialect * refactor: test_generate_replace_sql in TestMsSqlHook should only be tested on Airflow 2.10 or higher * refactor: Updated expected merge into statement * refactor: Only run test_generate_replace_sql on TestMsSqlDialect when Airflow is higher than 2.10 * refactor: generate_insert_sql based on dialects should only be tested on Airflow 3.0 or higher * refactor: Updated reason in skipped tests * refactor: Removed locking in merge into * refactor: Added kwargs to constructor of Dialect to make it future proof if additional arguments would be needed in the future * refactor: Removed row locking clause in generated replace sql statement and removed pyi file for mssql dialect * refactor: Implemented PostgresDialect * fix: Fixed constructor Dialect * refactor: Register PostgresDialect in providers.yaml and added unit test for PostgresDialect * refactor: PostgresHook now uses the dialect to generate statements and get primary_keys * refactor: Refactored DbApiHook * refactor: Refactored the dialect_name mechanism in DbApiHook, override it in specialized Hooks * refactor: Fixed some static checks * refactor: Fixed dialect.pyi * refactor: Refactored how dialects are resolved, if not found always fall back to default * refactor: Reformatted dialect method in DbApiHook * refactor: Changed message in raised exception of dialect method when not found * refactor: Added missing get_records method in Dialect definition * refactor: Fixed some static checks and mypy issues * refactor: Raise ValueError if replace_index doesn't exist * refactor: Increased version of apache-airflow-providers-common-sql to 1.17.1 for mssql and postgres * refactor: Updated dialect.pyi * refactor: Updated provider dependencies * refactor: Incremented version of apache-airflow-providers-common-sql in test_get_install_requirements * refactor: Reformatted get_records method * refactor: Common sql provider must depend on latest Airflow version to be able to discover dialects through ProvidersManager * refactor: Updated provider dependencies * Revert "refactor: Updated provider dependencies" This reverts commit 2b591f2. * Revert "refactor: Common sql provider must depend on latest Airflow version to be able to discover dialects through ProvidersManager" This reverts commit cb2d043. * refactor: Added get_dialects method in DbAPiHook which contains fallback code for Airflow 2.8.x so the provider can still be used with Airflow versions prior to 3.0.0 * fix: get_dialects isn't a property but a method * refactor: Refactored get_dialects in DbAPiHook and added unit tests * refactor: Added unit tests for MsSqlHook related to dialects * refactor: Added unit tests for PostgresHook related to dialects * refactor: Fixed some static checks * refactor: Removed get_dialects method as this wasn't backward compatible, avoid importing DialectInfo until min required airflow version is 3.0.0 or higher * refactor: Re-added missing deprecated methods for backward compatibility * refactor: Added resolve_dialects method in sql.pyi * refactor: Reorganized imports in sql module * refactor: Fixed definition of resolve_dialects in sql.pyi * refactor: Fixed TestDialect * refactor: Fixed DbAPi tests and moved tests from DbAPi to Odbc * refactor: Ignore flake8 F811 error as those redefinitions are there for backward compatibility * refactor: Move import of Dialect under TYPE_CHECKING block * refactor: Fixed TestMsSqlDialect * refactor: Fixed TestPostgresDialect * refactor: Reformatted MsSqlHook * refactor: Added docstring on placeholder property * refactor: If no dialect is found for given dialect name, then return default Dialect * refactor: Try ignoring flake8 F811 error as those redefinitions are there for backward compatibility * refactor: Moved Dialect out of TYPE_CHECKING block * fix: Fixed definition location of dialect in dialect.pyi * fix: Fixed TestTeradataHook * refactor: Marked test_when_provider_min_airflow_version_is_3_0_or_higher_remove_obsolete_code as db test * refactor: Removed handler definitions from sql.pyi * Revert "refactor: Removed handler definitions from sql.pyi" This reverts commit a93d73c. * refactor: Removed white line * refactor: Removed duplicate imports if handlers * refactor: Fixed some static checks * refactor: Changed logging level of generated sql statement to INFO in DbApiHook * Revert "refactor: Changed logging level of generated sql statement to INFO in DbApiHook" This reverts commit c30feaf. * fix: Moved dialects to correct providers location * fix: Deleted old providers location * fix: Re-added missing dialects for mssql and postgres * fix: Fixed 2 imports for providers tests * refactored: Reorganized some imports * refactored: Fixed dialect and sql types * refactored: Fixed import of test_utils in test_dag_run * refactored: Added white line in imports of test_dag_run * refactored: Escape reserved words as column names * refactored: Fixed initialisation of Dialects * refactored: Renamed escape_reserved_word to escape_column_name * refactored: Reformatted TestMsSqlDialect * refactored: Fixed constructor definition Dialect * refactored: Fixed TestDbApiHook * refactored: Removed get_reserved_words from dialect definition * refactored: Added logging in get_reserved_words method * refactor: Removed duplicate reserved_words property in DbApiHook * refactor: Fixed invocation of reserved_words property and changed name of postgres dialect to postgresql like in sqlalchemy * refactor: Removed override generate_insert_sql method in PostgresDialect as it doesn't do anything different than the existing one in Dialect * refactor: Added unit tests for _generate_insert_sql methods on MsSqlHook and PostgresHook * refactor: Reformatted test mssql and test postgres * refactor: Fixed TestPostgresDialect * refactor: Refactored get_reserved_words * refactor: Added escape column name format so we can customize it if needed * refactor: Suppress NoSuchModuleError exception when trying to load dialect from sqlalchemy to get reserved words * refactor: Removed name from Dialect and added unit test for dialect name in JdbcHook * refactor: Fixed parameters in get_column_names method of Dialect * refactor: Added missing optional schema parameter to get_primary_keys method of MsSqlDialect * refactor: Fixed TestDialect * refactor: Fixed TestDbApiHook * refactor: Fixed TestMsSqlDialect * refactor: Reformatted test_generate_replace_sql * refactor: Fixed dialect in MsSqlHook and PostgresHook * refactor: Fixed TestPostgresDialect * refactor: Mark TestMySqlHook as a db test * refactor: Fixed test_generate_insert_sql_with_already_escaped_column_name in TestPostgresHook * refactor: Reactivate postgres backend in TestPostgresHook * refactor: Removed name param of constructor in Dialect definition * refactor: Reformatted imports for TestMySqlHook * refactor: Fixed import of get_provider_min_airflow_version in test sql * refactor: Override default escape_column_name_format for MySqlHook * refactor: Fixed tests in TestMySqlHook * refactor: Refactored INSERT_SQL_STATEMENT constant in TestMySqlHook * refactor: When using ODBC, we should also use the odbc connection when creating an sqlalchemy engine * refactor: Added get_target_fields in Dialect which only returns insertable column_names and added core.dbapihook_resolve_target_fields configuration parameter to allow to specify if we want to resolve target_fields automatically or not * refactor: By default the core.dbapihook_resolve_target_fields configuration parameter should be False so the original behaviour is respected * refactor: Added logging statement for target_fields in Dialect * refactor: Moved _resolve_target_fields as static field of DbApiHook and fixed TestMsSqlHook * refactor: Added test for get_sqlalchemy_engine in OdbcHook * refactor: Reformatted teardown method * Revert "refactor: Added test for get_sqlalchemy_engine in OdbcHook" This reverts commit 871e96b. * refactor: Remove patched get_sql_alchemy method in OdbcHook, will fix this in dedicated PR * refactor: Removed quotes from schema and table_name before invoking sqlalchemy inspector methods * refactor: Removed check in test_sql for Airflow 2.8 plus as it is already at that min required version * refactor: Fixed get_primary_keys method in PostgresDialect * refactor: Reformatted get_primary_keys method of PostgresDialect * refactor: extract_schema_from_table is now a public classmethod of Dialect * fix: extract_schema_from_table is now a public classmethod of Dialect * refactor: Reorganized imports * refactor: Reorganized imports dialect and postgres * refactor: Fixed test_dialect in TestProviderManager * refactor: Removed operators section from provider.yaml in mssql and postgres * refactor: Removed unused imports in postgres hook * refactor: Added missing import for AirflowProviderDeprecationWarning * refactor: Added rowlock option in merge into statement for MSSQL * refactor: Updated expected replace statement for MSSQL --------- Co-authored-by: David Blain <[email protected]> Co-authored-by: David Blain <[email protected]>
* refactor: Added unit test for handlers module in mssql * refactor: Added unit test for Dialect class * refactor: Reformatted unit test of Dialect class * fix: Added missing import of TYPE_CHECKING * refactor: Added dialects in provider schema and moved MsSqlDialect to Microsoft mssql provider * refactor: Removed duplicate handlers and import them from handlers module * refactor: Fixed import of TYPE_CHECKING in mssql hook * refactor: Fixed some static checks and imports * refactor: Dialect should be defined as an array in provider.yaml, not a single element * refactor: Fixed default dialect name for common sql provider * refactor: Fixed dialect name for Microsoft MSSQL provider * refactor: Fixed module for dialect in pyton-modules of common sql provider * refactor: Dialect module is not part of hooks * refactor: Moved unit tests for default Dialect to common sql provider instead of Microsoft MSSQL provider * refactor: Added unit test for MsSqlDialect * refactor: Reformatted TestMsSqlDialect * refactor: Implemented dialect resolution using the ProvidersManagers in DbApiHook * refactor: Updated comment in dialects property * refactor: Added dialects lists command * refactor: Removed unused code from _import_hook method * refactor: Reformatted _discover_provider_dialects method in ProvidersManager * refactor: Removed unused imports from MsSqlHook * refactor: Removed dialects from DbApiHook definition * refactor: Reformatted _discover_provider_dialects * refactor: Renamed module for TestMsSqlDialect * refactor: test_generate_replace_sql in TestMsSqlHook should only be tested on Airflow 2.10 or higher * refactor: Updated expected merge into statement * refactor: Only run test_generate_replace_sql on TestMsSqlDialect when Airflow is higher than 2.10 * refactor: generate_insert_sql based on dialects should only be tested on Airflow 3.0 or higher * refactor: Updated reason in skipped tests * refactor: Removed locking in merge into * refactor: Added kwargs to constructor of Dialect to make it future proof if additional arguments would be needed in the future * refactor: Removed row locking clause in generated replace sql statement and removed pyi file for mssql dialect * refactor: Implemented PostgresDialect * fix: Fixed constructor Dialect * refactor: Register PostgresDialect in providers.yaml and added unit test for PostgresDialect * refactor: PostgresHook now uses the dialect to generate statements and get primary_keys * refactor: Refactored DbApiHook * refactor: Refactored the dialect_name mechanism in DbApiHook, override it in specialized Hooks * refactor: Fixed some static checks * refactor: Fixed dialect.pyi * refactor: Refactored how dialects are resolved, if not found always fall back to default * refactor: Reformatted dialect method in DbApiHook * refactor: Changed message in raised exception of dialect method when not found * refactor: Added missing get_records method in Dialect definition * refactor: Fixed some static checks and mypy issues * refactor: Raise ValueError if replace_index doesn't exist * refactor: Increased version of apache-airflow-providers-common-sql to 1.17.1 for mssql and postgres * refactor: Updated dialect.pyi * refactor: Updated provider dependencies * refactor: Incremented version of apache-airflow-providers-common-sql in test_get_install_requirements * refactor: Reformatted get_records method * refactor: Common sql provider must depend on latest Airflow version to be able to discover dialects through ProvidersManager * refactor: Updated provider dependencies * Revert "refactor: Updated provider dependencies" This reverts commit 2b591f2. * Revert "refactor: Common sql provider must depend on latest Airflow version to be able to discover dialects through ProvidersManager" This reverts commit cb2d043. * refactor: Added get_dialects method in DbAPiHook which contains fallback code for Airflow 2.8.x so the provider can still be used with Airflow versions prior to 3.0.0 * fix: get_dialects isn't a property but a method * refactor: Refactored get_dialects in DbAPiHook and added unit tests * refactor: Added unit tests for MsSqlHook related to dialects * refactor: Added unit tests for PostgresHook related to dialects * refactor: Fixed some static checks * refactor: Removed get_dialects method as this wasn't backward compatible, avoid importing DialectInfo until min required airflow version is 3.0.0 or higher * refactor: Re-added missing deprecated methods for backward compatibility * refactor: Added resolve_dialects method in sql.pyi * refactor: Reorganized imports in sql module * refactor: Fixed definition of resolve_dialects in sql.pyi * refactor: Fixed TestDialect * refactor: Fixed DbAPi tests and moved tests from DbAPi to Odbc * refactor: Ignore flake8 F811 error as those redefinitions are there for backward compatibility * refactor: Move import of Dialect under TYPE_CHECKING block * refactor: Fixed TestMsSqlDialect * refactor: Fixed TestPostgresDialect * refactor: Reformatted MsSqlHook * refactor: Added docstring on placeholder property * refactor: If no dialect is found for given dialect name, then return default Dialect * refactor: Try ignoring flake8 F811 error as those redefinitions are there for backward compatibility * refactor: Moved Dialect out of TYPE_CHECKING block * fix: Fixed definition location of dialect in dialect.pyi * fix: Fixed TestTeradataHook * refactor: Marked test_when_provider_min_airflow_version_is_3_0_or_higher_remove_obsolete_code as db test * refactor: Removed handler definitions from sql.pyi * Revert "refactor: Removed handler definitions from sql.pyi" This reverts commit a93d73c. * refactor: Removed white line * refactor: Removed duplicate imports if handlers * refactor: Fixed some static checks * refactor: Changed logging level of generated sql statement to INFO in DbApiHook * Revert "refactor: Changed logging level of generated sql statement to INFO in DbApiHook" This reverts commit c30feaf. * fix: Moved dialects to correct providers location * fix: Deleted old providers location * fix: Re-added missing dialects for mssql and postgres * fix: Fixed 2 imports for providers tests * refactored: Reorganized some imports * refactored: Fixed dialect and sql types * refactored: Fixed import of test_utils in test_dag_run * refactored: Added white line in imports of test_dag_run * refactored: Escape reserved words as column names * refactored: Fixed initialisation of Dialects * refactored: Renamed escape_reserved_word to escape_column_name * refactored: Reformatted TestMsSqlDialect * refactored: Fixed constructor definition Dialect * refactored: Fixed TestDbApiHook * refactored: Removed get_reserved_words from dialect definition * refactored: Added logging in get_reserved_words method * refactor: Removed duplicate reserved_words property in DbApiHook * refactor: Fixed invocation of reserved_words property and changed name of postgres dialect to postgresql like in sqlalchemy * refactor: Removed override generate_insert_sql method in PostgresDialect as it doesn't do anything different than the existing one in Dialect * refactor: Added unit tests for _generate_insert_sql methods on MsSqlHook and PostgresHook * refactor: Reformatted test mssql and test postgres * refactor: Fixed TestPostgresDialect * refactor: Refactored get_reserved_words * refactor: Added escape column name format so we can customize it if needed * refactor: Suppress NoSuchModuleError exception when trying to load dialect from sqlalchemy to get reserved words * refactor: Removed name from Dialect and added unit test for dialect name in JdbcHook * refactor: Fixed parameters in get_column_names method of Dialect * refactor: Added missing optional schema parameter to get_primary_keys method of MsSqlDialect * refactor: Fixed TestDialect * refactor: Fixed TestDbApiHook * refactor: Fixed TestMsSqlDialect * refactor: Reformatted test_generate_replace_sql * refactor: Fixed dialect in MsSqlHook and PostgresHook * refactor: Fixed TestPostgresDialect * refactor: Mark TestMySqlHook as a db test * refactor: Fixed test_generate_insert_sql_with_already_escaped_column_name in TestPostgresHook * refactor: Reactivate postgres backend in TestPostgresHook * refactor: Removed name param of constructor in Dialect definition * refactor: Reformatted imports for TestMySqlHook * refactor: Fixed import of get_provider_min_airflow_version in test sql * refactor: Override default escape_column_name_format for MySqlHook * refactor: Fixed tests in TestMySqlHook * refactor: Refactored INSERT_SQL_STATEMENT constant in TestMySqlHook * refactor: When using ODBC, we should also use the odbc connection when creating an sqlalchemy engine * refactor: Added get_target_fields in Dialect which only returns insertable column_names and added core.dbapihook_resolve_target_fields configuration parameter to allow to specify if we want to resolve target_fields automatically or not * refactor: By default the core.dbapihook_resolve_target_fields configuration parameter should be False so the original behaviour is respected * refactor: Added logging statement for target_fields in Dialect * refactor: Moved _resolve_target_fields as static field of DbApiHook and fixed TestMsSqlHook * refactor: Added test for get_sqlalchemy_engine in OdbcHook * refactor: Reformatted teardown method * Revert "refactor: Added test for get_sqlalchemy_engine in OdbcHook" This reverts commit 871e96b. * refactor: Remove patched get_sql_alchemy method in OdbcHook, will fix this in dedicated PR * refactor: Removed quotes from schema and table_name before invoking sqlalchemy inspector methods * refactor: Removed check in test_sql for Airflow 2.8 plus as it is already at that min required version * refactor: Fixed get_primary_keys method in PostgresDialect * refactor: Reformatted get_primary_keys method of PostgresDialect * refactor: extract_schema_from_table is now a public classmethod of Dialect * fix: extract_schema_from_table is now a public classmethod of Dialect * refactor: Reorganized imports * refactor: Reorganized imports dialect and postgres * refactor: Fixed test_dialect in TestProviderManager * refactor: Removed operators section from provider.yaml in mssql and postgres * refactor: Removed unused imports in postgres hook * refactor: Added missing import for AirflowProviderDeprecationWarning * refactor: Added rowlock option in merge into statement for MSSQL * refactor: Updated expected replace statement for MSSQL --------- Co-authored-by: David Blain <[email protected]> Co-authored-by: David Blain <[email protected]>
This PR introduces the notion of an SQL Dialect class in the DbApiHook.
Let me first elaborate why I did such an approach, this PR is a proposition and can of course be changed where needed but gives an idea on how it could be implemented. The reason why I wanted to introduce this is because I experienced that the _insert_statement_format and _replace_statement_format string formatting properties in the
DbApiHook
are a bit of a naïve approach. Yes, this approach is generic and doesn't ty the code to a specific database, hence why I want to introduce some kind of dialect, but it's also limited as the parameters passed to the string format are hard-coded and aren't always sufficient for certain databases.We, for example are using MsSQL and SAP Hana a lot in our DAG's, and since we are using the
insert_rows
methods as much as possible for inserting/updating data, we liked the approach of the replace parameter which allows you to specify if you want to insert or replace data. That way all our inserts/replace tasks in our DAG's are using the same generic code independently of which database we want to write to. We also use theGenericTransfer
operator across multiple databases that way which is great. Beside that, we also implemented a customSQLInsertRowsOperator
which does the same as the GenericTransfer operator but then get's the data from an XCom instead of another database, I'll also propose a PR for that once we have a good solution for the current problem we try to solve in this PR.So the issue with the current approach for automatically generating the replace statement is when we want to use MsSQL. For SAP Hana it was easy, we just had to override the
replace_statement_format
parameter in the connection extra field and we were done. For MsSQL that was not possible, that's why I created another PR that is already merged in Airflow for theMsSqlHook
, in which I override the_generate_insert_sql
method of theDbApiHook
to allow a more complex replace statement generation to support the replace option.The problem with this approach is that this functionality will only work when using the MsSQL connection type (which underneath uses the pymssql library). Unfortunately, we experienced a lot of connection issues and even deadlocks (later one is maybe unrelated) when writing data to MsSQL using the
MsSQLHook
. Our DBA suggested using theOdbcHook
, unfortunately theOdbcHook
doesn't have the enhanced _generate_insert_sql for MsSQL asOdbcHook
is agnostic of which database you connect to. So in order to check if theOdbcHook
would solve our connection issues with MsSQL, I temporarily patched the_generate_insert_sql
method of theOdbcHook
to test if the issues are solved using theOdbcHook
.So we did a load test and indeed we did not experience any connection issues nor deadlocks and the performance was also a lot better compared to the
MsSqlHook
. With theMsSqlHook
, we had to specify themax_active_tis_per_dag=1
parameter to prevent concurrent tasks and avoid deadlocks, with theOdbcHook
we didn't have to but once again this can be pure coincidence due to the concurrency nature.So that's why I started thinking about a more generic approach of this problem, which would work independently of which Connection type you use, and move all the logic to the
DbApiHook
, hence why I wanted to introduce the notion of aDialect
which can then be specialized where needed per database. Of course we would need to think how we want it to be implemented, here I just did a bit of a naïve implemented to show the idea. We are already using this in our Airflow environment through patched code and it works great, now we have to find a way to implement this idea (if accepted) in a good/clean and future proof approach.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.