Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/jdbc-connector): Adding a connector for JDBC connections with customizable platform and sqlglot dialect. #12249

Draft
wants to merge 34 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f7fc52d
Create jdbc.py
acrylJonny Dec 16, 2024
4be382f
Update jdbc.py
acrylJonny Dec 18, 2024
51c99eb
Update jdbc.py
acrylJonny Dec 18, 2024
2e468d8
Update jdbc.py
acrylJonny Dec 19, 2024
1827551
Update jdbc.py
acrylJonny Dec 24, 2024
8904537
Update jdbc.py
acrylJonny Dec 24, 2024
1117bd2
Update jdbc.py
acrylJonny Dec 24, 2024
7d001a9
Update jdbc.py
acrylJonny Dec 27, 2024
172b4f6
Update jdbc.py
acrylJonny Dec 27, 2024
1422677
Merge branch 'datahub-project:master' into generic-jdbc-connector
acrylJonny Dec 27, 2024
fea9e49
Update jdbc.py
acrylJonny Dec 30, 2024
fd19ea0
Merge branch 'datahub-project:master' into generic-jdbc-connector
acrylJonny Dec 30, 2024
336bd5e
Update jdbc.py
acrylJonny Dec 30, 2024
875067d
moving to folder
acrylJonny Dec 30, 2024
33895b9
tests
acrylJonny Dec 30, 2024
b5023f1
tests and linting
acrylJonny Dec 31, 2024
d6619d9
linting
acrylJonny Dec 31, 2024
eea1cb0
Merge branch 'master' into generic-jdbc-connector
acrylJonny Dec 31, 2024
23494b9
remove mce files
acrylJonny Dec 31, 2024
d58f33c
test improvement and dataset subtype
acrylJonny Dec 31, 2024
330fefd
docs
acrylJonny Dec 31, 2024
1fe0e80
platform_name
acrylJonny Dec 31, 2024
773cc07
Update source.py
acrylJonny Dec 31, 2024
8916dec
Update source.py
acrylJonny Dec 31, 2024
141b7b7
update setup for jdbc dependencies
acrylJonny Dec 31, 2024
d0b0225
table key and stored proc improvements
acrylJonny Jan 2, 2025
0e9dd71
Merge branch 'master' into generic-jdbc-connector
acrylJonny Jan 2, 2025
885f097
more modularity
acrylJonny Jan 2, 2025
c2c027f
renaming of files
acrylJonny Jan 3, 2025
dc14690
remove profiling config
acrylJonny Jan 3, 2025
92cfcfa
testing updates
acrylJonny Jan 3, 2025
5ae583e
Merge branch 'master' into generic-jdbc-connector
acrylJonny Jan 3, 2025
ac5f8ff
updates to tests and dependencies
acrylJonny Jan 3, 2025
ba9b6cd
Merge branch 'master' into generic-jdbc-connector
acrylJonny Feb 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions metadata-ingestion/docs/sources/jdbc/jdbc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
### Starter Recipe

```yaml
source:
type: jdbc
config:
# JDBC Driver Configuration
driver:
driver_class: org.postgresql.Driver # Replace with your database's driver class
# Either specify maven_coordinates or driver_path
maven_coordinates: org.postgresql:postgresql:42.7.1
# driver_path: "/path/to/driver.jar"

# Connection Configuration
connection:
uri: "jdbc:postgresql://localhost:5432//mydb" # Replace with your database URI
username: "user"
password: "pass"

# Optional SSL Configuration
ssl_config:
cert_path: "/path/to/cert"
# cert_type: "pem" # pem, jks, or p12
# cert_password: ""

# Additional JDBC Properties
properties:
applicationName: "datahub_jdbc_ingestion"

# Additional JVM Arguments
jvm_args:
- "-Xmx1g"
- "-Djavax.net.ssl.trustStore=/etc/ssl/certs/java/cacerts"

# Optional: SQL dialect for query parsing
sqlglot_dialect: "postgres" # Replace with your database's dialect

# Optional Filters
schema_pattern:
allow:
- "schema1"
- "schema2"

table_pattern:
allow:
- "schema1.table1"
deny:
- "schema1.temp_.*"

# Feature flags
include_tables: true
include_views: true
include_stored_procedures: false

sink:
# sink configs
```
25 changes: 25 additions & 0 deletions metadata-ingestion/docs/sources/jdbc/jdbc_pre.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
### Setup

This integration pulls metadata from databases via JDBC connections. It supports various database systems through their respective JDBC drivers.

You'll need:
1. A running database instance that you want to connect to
2. The appropriate JDBC driver for your database
3. Valid credentials with permissions to read metadata

#### Steps to Get Started

1. **JDBC Driver Setup**:
- Option 1: Download the JDBC driver JAR file for your database
- Option 2: Use Maven coordinates to automatically download the driver

2. **Permissions Required**:
- READ access to system catalogs/metadata views
- Ability to execute metadata queries
- Access to relevant schemas and tables

3. **Connection Information**:
- JDBC connection URL
- Username and password (if using basic authentication)
- SSL configuration (if required)
- Any additional JDBC properties needed for your specific database
53 changes: 53 additions & 0 deletions metadata-ingestion/docs/sources/jdbc/jdbc_recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
source:
type: jdbc
config:
# JDBC Driver Configuration
driver:
driver_class: org.postgresql.Driver # Replace with your database's driver class
# Either specify maven_coordinates or driver_path
maven_coordinates: org.postgresql:postgresql:42.7.1
# driver_path: "/path/to/driver.jar"

# Connection Configuration
connection:
uri: "jdbc:postgresql://localhost:5432//mydb" # Replace with your database URI
username: "user"
password: "pass"

# Optional SSL Configuration
ssl_config:
cert_path: "/path/to/cert"
# cert_type: "pem" # pem, jks, or p12
# cert_password: ""

# Additional JDBC Properties
properties:
applicationName: "datahub_jdbc_ingestion"

# Additional JVM Arguments
jvm_args:
- "-Xmx1g"
- "-Djavax.net.ssl.trustStore=/etc/ssl/certs/java/cacerts"

# Optional: SQL dialect for query parsing
sqlglot_dialect: "postgres" # Replace with your database's dialect

# Optional Filters
schema_pattern:
allow:
- "schema1"
- "schema2"

table_pattern:
allow:
- "schema1.table1"
deny:
- "schema1.temp_.*"

# Feature flags
include_tables: true
include_views: true
include_stored_procedures: false

sink:
# sink configs
7 changes: 7 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,11 @@
| pyhive_common
| {"psycopg2-binary", "pymysql>=1.0.2"},
"iceberg": iceberg_common,

"jdbc": usage_common
| classification_lib
| sqlglot_lib
| {"beautifulsoup4", "JayDeBeApi", "JPype1", "requests"},
"iceberg-catalog": aws_common,
"json-schema": set(),
"kafka": kafka_common | kafka_protobuf,
Expand Down Expand Up @@ -636,6 +641,7 @@
"iceberg",
"iceberg-catalog",
"mlflow",
"jdbc",
"json-schema",
"ldap",
"looker",
Expand Down Expand Up @@ -743,6 +749,7 @@
"hana = datahub.ingestion.source.sql.hana:HanaSource",
"hive = datahub.ingestion.source.sql.hive:HiveSource",
"hive-metastore = datahub.ingestion.source.sql.hive_metastore:HiveMetastoreSource",
"jdbc = datahub.ingestion.source.jdbc.source:JDBCSource",
"json-schema = datahub.ingestion.source.schema.json_schema:JsonSchemaSource",
"kafka = datahub.ingestion.source.kafka.kafka:KafkaSource",
"kafka-connect = datahub.ingestion.source.kafka_connect.kafka_connect:KafkaConnectSource",
Expand Down
Empty file.
172 changes: 172 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import re
from typing import Dict, List, Optional

from pydantic import Field, validator
from sqlglot import dialects

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig


class SSLConfig(ConfigModel):
"""SSL Certificate configuration"""

cert_path: Optional[str] = Field(
default=None,
description="Path to SSL certificate file",
)
cert_content: Optional[str] = Field(
default=None,
description="Base64 encoded certificate content",
)
cert_type: str = Field(
default="pem",
description="Certificate type (pem, jks, p12)",
)
cert_password: Optional[str] = Field(
default=None,
description="Certificate password if required",
)

@validator("cert_type")
def validate_cert_type(cls, v: str) -> str:
valid_types = ["pem", "jks", "p12"]
if v.lower() not in valid_types:
raise ValueError(f"cert_type must be one of: {', '.join(valid_types)}")
return v.lower()

Check warning on line 46 in metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py#L43-L46

Added lines #L43 - L46 were not covered by tests


class JDBCConnectionConfig(ConfigModel):
"""JDBC Connection configuration"""

uri: str = Field(
description="JDBC URI (jdbc:protocol://host:port/database)",
)
username: Optional[str] = Field(
default=None,
description="Database username",
)
password: Optional[str] = Field(
default=None,
description="Database password",
)
properties: Dict[str, str] = Field(
default_factory=dict,
description="Additional JDBC properties",
)
ssl_config: Optional[SSLConfig] = Field(
default=None,
description="SSL configuration",
)

@validator("uri")
def validate_uri(cls, v: str) -> str:
if not v.startswith("jdbc:"):
raise ValueError("URI must start with 'jdbc:'")

Check warning on line 75 in metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py#L75

Added line #L75 was not covered by tests
return v


class JDBCDriverConfig(ConfigModel):
"""JDBC Driver configuration"""

driver_class: str = Field(
description="Fully qualified JDBC driver class name",
)
driver_path: Optional[str] = Field(
default=None,
description="Path to JDBC driver JAR",
)
maven_coordinates: Optional[str] = Field(
default=None,
description="Maven coordinates (groupId:artifactId:version)",
)

@validator("driver_class")
def validate_driver_class(cls, v: str) -> str:
if not v:
raise ValueError("driver_class must be specified")

Check warning on line 97 in metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py#L97

Added line #L97 was not covered by tests
return v

@validator("maven_coordinates")
def validate_maven_coordinates(cls, v: Optional[str]) -> Optional[str]:
if v and not re.match(r"^[^:]+:[^:]+:[^:]+$", v):
raise ValueError(

Check warning on line 103 in metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py#L103

Added line #L103 was not covered by tests
"maven_coordinates must be in format 'groupId:artifactId:version'"
)
return v


class JDBCSourceConfig(
StatefulIngestionConfigBase,
EnvConfigMixin,
PlatformInstanceConfigMixin,
):
"""Configuration for JDBC metadata extraction"""

driver: JDBCDriverConfig = Field(
description="JDBC driver configuration",
)
connection: JDBCConnectionConfig = Field(
description="Database connection configuration",
)
platform: str = Field(
description="Name of platform being ingested.",
)
include_tables: bool = Field(
default=True,
description="Include tables in extraction",
)
include_views: bool = Field(
default=True,
description="Include views in extraction",
)
include_stored_procedures: bool = Field(
default=False,
description="Include stored procedures in extraction",
)
sqlglot_dialect: Optional[str] = Field(
default=None,
description="sqlglot dialect to use for SQL transpiling",
)
jvm_args: List[str] = Field(
default=[],
description="JVM arguments for JDBC driver",
)
schema_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for schemas",
)
table_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for tables",
)
view_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for views",
)
usage: BaseUsageConfig = Field(
description="Usage statistics configuration",
default=BaseUsageConfig(),
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@validator("sqlglot_dialect")
def validate_dialect(cls, v):
if v is None:
return v

Check warning on line 166 in metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py#L166

Added line #L166 was not covered by tests
valid_dialects = [d for d in dir(dialects) if not d.startswith("_")]
if v not in valid_dialects:
raise ValueError(

Check warning on line 169 in metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/jdbc/config.py#L169

Added line #L169 was not covered by tests
f"Invalid dialect '{v}'. Must be one of: {', '.join(sorted(valid_dialects))}"
)
return v
Loading
Loading