Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def as_pydantic_object(
assert self.schema_ref
assert self.schema_ref == model_type.__name__
object_dict = self.as_raw_json()
return model_type.parse_obj(object_dict)
return model_type.model_validate(object_dict)

@classmethod
def from_resource_value(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ def from_yaml(cls, file: str) -> Iterable["Dataset"]:
if isinstance(datasets, dict):
datasets = [datasets]
for dataset_raw in datasets:
dataset = Dataset.parse_obj(dataset_raw)
dataset = Dataset.model_validate(dataset_raw)
# dataset = Dataset.model_validate(dataset_raw, strict=True)
yield dataset

Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/api/entities/forms/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def create(file: str) -> None:
with get_default_graph(ClientMode.CLI) as emitter, open(file) as fp:
forms: List[dict] = yaml.safe_load(fp)
for form_raw in forms:
form = Forms.parse_obj(form_raw)
form = Forms.model_validate(form_raw)

try:
if not FormType.has_value(form.type):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def from_yaml(file: str) -> List["StructuredProperties"]:

result: List[StructuredProperties] = []
for structuredproperty_raw in structuredproperties:
result.append(StructuredProperties.parse_obj(structuredproperty_raw))
result.append(StructuredProperties.model_validate(structuredproperty_raw))
return result

def generate_mcps(self) -> List[MetadataChangeProposalWrapper]:
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/cli/check_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def test_allow_deny(config: str, input: str, pattern_key: str) -> None:
click.secho(f"{pattern_key} is not defined in the config", fg="red")
exit(1)

allow_deny_pattern = AllowDenyPattern.parse_obj(pattern_dict)
allow_deny_pattern = AllowDenyPattern.model_validate(pattern_dict)
if allow_deny_pattern.allowed(input):
click.secho(f"✅ {input} is allowed by {pattern_key}", fg="green")
exit(0)
Expand Down Expand Up @@ -372,7 +372,7 @@ def test_path_spec(config: str, input: str, path_spec_key: str) -> None:
pattern_dicts = [pattern_dicts]

for pattern_dict in pattern_dicts:
path_spec_pattern = PathSpec.parse_obj(pattern_dict)
path_spec_pattern = PathSpec.model_validate(pattern_dict)
if path_spec_pattern.allowed(input):
click.echo(f"{input} is allowed by {path_spec_pattern}")
else:
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def load_client_config() -> DatahubClientConfig:
try:
_ensure_datahub_config()
client_config_dict = get_raw_client_config()
datahub_config: DatahubClientConfig = DatahubConfig.parse_obj(
datahub_config: DatahubClientConfig = DatahubConfig.model_validate(
client_config_dict
).gms
return datahub_config
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/cli/lite_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class LiteCliConfig(DatahubConfig):

def get_lite_config() -> LiteLocalConfig:
client_config_dict = get_raw_client_config()
lite_config = LiteCliConfig.parse_obj(client_config_dict)
lite_config = LiteCliConfig.model_validate(client_config_dict)
return lite_config.lite


Expand Down Expand Up @@ -337,7 +337,7 @@ def init(ctx: click.Context, type: Optional[str], file: Optional[str]) -> None:
new_lite_config_dict["type"] = type
if file:
new_lite_config_dict["config"]["file"] = file
new_lite_config = LiteLocalConfig.parse_obj(new_lite_config_dict)
new_lite_config = LiteLocalConfig.model_validate(new_lite_config_dict)
if lite_config != new_lite_config:
if click.confirm(
f"Will replace datahub lite config {lite_config} with {new_lite_config}"
Expand Down
8 changes: 4 additions & 4 deletions metadata-ingestion/src/datahub/cli/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,13 @@ def migrate_containers(
try:
newKey: Union[SchemaKey, DatabaseKey, ProjectIdKey, BigQueryDatasetKey]
if subType == "Schema":
newKey = SchemaKey.parse_obj(customProperties)
newKey = SchemaKey.model_validate(customProperties)
elif subType == "Database":
newKey = DatabaseKey.parse_obj(customProperties)
newKey = DatabaseKey.model_validate(customProperties)
elif subType == "Project":
newKey = ProjectIdKey.parse_obj(customProperties)
newKey = ProjectIdKey.model_validate(customProperties)
elif subType == "Dataset":
newKey = BigQueryDatasetKey.parse_obj(customProperties)
newKey = BigQueryDatasetKey.model_validate(customProperties)
else:
log.warning(f"Invalid subtype {subType}. Skipping")
continue
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/cli/quickstart_versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def fetch_quickstart_config(cls) -> "QuickstartVersionMappingConfig":
path = os.path.expanduser(LOCAL_QUICKSTART_MAPPING_FILE)
with open(path) as f:
config_raw = yaml.safe_load(f)
return cls.parse_obj(config_raw)
return cls.model_validate(config_raw)

config_raw = None
try:
Expand Down Expand Up @@ -110,7 +110,7 @@ def fetch_quickstart_config(cls) -> "QuickstartVersionMappingConfig":
}
)

config = cls.parse_obj(config_raw)
config = cls.model_validate(config_raw)

# If stable is not defined in the config, we need to fetch the latest version from github.
if config.quickstart_version_map.get("stable") is None:
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/specific/group_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def upsert(file: Path, override_editable: bool) -> None:
with get_default_graph(ClientMode.CLI) as emitter:
for group_config in group_configs:
try:
datahub_group = CorpGroup.parse_obj(group_config)
datahub_group = CorpGroup.model_validate(group_config)
for mcp in datahub_group.generate_mcp(
generation_config=CorpGroupGenerationConfig(
override_editable=override_editable, datahub_graph=emitter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def to_yaml_list(
with open(file, "r") as fp:
existing_objects = yaml.load(fp) # this is a list of dicts
existing_objects = [
StructuredProperties.parse_obj(obj) for obj in existing_objects
StructuredProperties.model_validate(obj) for obj in existing_objects
]
objects = [obj for obj in objects]
# do a positional update of the existing objects
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/specific/user_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def upsert(file: Path, override_editable: bool) -> None:
with get_default_graph(ClientMode.CLI) as emitter:
for user_config in user_configs:
try:
datahub_user: CorpUser = CorpUser.parse_obj(user_config)
datahub_user: CorpUser = CorpUser.model_validate(user_config)

emitter.emit_all(
datahub_user.generate_mcp(
Expand Down
16 changes: 14 additions & 2 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ def _schema_extra(schema: Dict[str, Any], model: Type["ConfigModel"]) -> None:

@classmethod
def parse_obj_allow_extras(cls, obj: Any) -> Self:
"""Parse an object while allowing extra fields.

'parse_obj' in Pydantic v1 is equivalent to 'model_validate' in Pydantic v2.
However, 'parse_obj_allow_extras' in v1 is not directly available in v2.

`model_validate(..., strict=False)` does not work because it still raises errors on extra fields;
strict=False only affects type coercion and validation strictness, not extra field handling.

This method temporarily modifies the model's configuration to allow extra fields

TODO: Do we really need to support this behaviour? Consider removing this method in future.
"""
if PYDANTIC_VERSION_2:
try:
with unittest.mock.patch.dict(
Expand All @@ -148,12 +160,12 @@ def parse_obj_allow_extras(cls, obj: Any) -> Self:
clear=False,
):
cls.model_rebuild(force=True) # type: ignore
return cls.parse_obj(obj)
return cls.model_validate(obj)
finally:
cls.model_rebuild(force=True) # type: ignore
else:
with unittest.mock.patch.object(cls.Config, "extra", pydantic.Extra.allow):
return cls.parse_obj(obj)
return cls.model_validate(obj)


class PermissiveConfigModel(ConfigModel):
Expand Down
77 changes: 47 additions & 30 deletions metadata-ingestion/src/datahub/configuration/git.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import pathlib
from copy import deepcopy
from typing import Any, Dict, Optional, Union

from pydantic import Field, FilePath, SecretStr, validator
from pydantic import (
Field,
FilePath,
SecretStr,
field_validator,
model_validator,
)

from datahub.configuration.common import ConfigModel
from datahub.configuration.validate_field_rename import pydantic_renamed_field
Expand Down Expand Up @@ -41,7 +48,8 @@ class GitReference(ConfigModel):
transform=lambda url: _GITHUB_URL_TEMPLATE,
)

@validator("repo", pre=True)
@field_validator("repo", mode="before")
@classmethod
def simplify_repo_url(cls, repo: str) -> str:
if repo.startswith("github.com/") or repo.startswith("gitlab.com"):
repo = f"https://{repo}"
Expand All @@ -53,21 +61,22 @@ def simplify_repo_url(cls, repo: str) -> str:

return repo

@validator("url_template", always=True)
def infer_url_template(cls, url_template: Optional[str], values: dict) -> str:
if url_template is not None:
return url_template
@model_validator(mode="after")
def infer_url_template(self) -> "GitReference":
if self.url_template is not None:
return self

repo: str = values["repo"]
if repo.startswith(_GITHUB_PREFIX):
return _GITHUB_URL_TEMPLATE
elif repo.startswith(_GITLAB_PREFIX):
return _GITLAB_URL_TEMPLATE
if self.repo.startswith(_GITHUB_PREFIX):
self.url_template = _GITHUB_URL_TEMPLATE
elif self.repo.startswith(_GITLAB_PREFIX):
self.url_template = _GITLAB_URL_TEMPLATE
else:
raise ValueError(
"Unable to infer URL template from repo. Please set url_template manually."
)

return self

def get_url_for_file_path(self, file_path: str) -> str:
assert self.url_template
if self.url_subdir:
Expand Down Expand Up @@ -98,35 +107,43 @@ class GitInfo(GitReference):

_fix_deploy_key_newlines = pydantic_multiline_string("deploy_key")

@validator("deploy_key", pre=True, always=True)
@model_validator(mode="before")
@classmethod
def deploy_key_filled_from_deploy_key_file(
cls, v: Optional[SecretStr], values: Dict[str, Any]
) -> Optional[SecretStr]:
if v is None:
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
# In-place update of the input dict would cause state contamination.
# So a deepcopy is performed first.
values = deepcopy(values)

if values.get("deploy_key") is None:
deploy_key_file = values.get("deploy_key_file")
if deploy_key_file is not None:
with open(deploy_key_file) as fp:
deploy_key = SecretStr(fp.read())
return deploy_key
return v

@validator("repo_ssh_locator", always=True)
def infer_repo_ssh_locator(
cls, repo_ssh_locator: Optional[str], values: dict
) -> str:
if repo_ssh_locator is not None:
return repo_ssh_locator

repo: str = values["repo"]
if repo.startswith(_GITHUB_PREFIX):
return f"[email protected]:{repo[len(_GITHUB_PREFIX) :]}.git"
elif repo.startswith(_GITLAB_PREFIX):
return f"[email protected]:{repo[len(_GITLAB_PREFIX) :]}.git"
values["deploy_key"] = deploy_key
return values

@model_validator(mode="after")
def infer_repo_ssh_locator(self) -> "GitInfo":
if self.repo_ssh_locator is not None:
return self

if self.repo.startswith(_GITHUB_PREFIX):
self.repo_ssh_locator = (
f"[email protected]:{self.repo[len(_GITHUB_PREFIX) :]}.git"
)
elif self.repo.startswith(_GITLAB_PREFIX):
self.repo_ssh_locator = (
f"[email protected]:{self.repo[len(_GITLAB_PREFIX) :]}.git"
)
else:
raise ValueError(
"Unable to infer repo_ssh_locator from repo. Please set repo_ssh_locator manually."
)

return self

@property
def branch_for_clone(self) -> Optional[str]:
# If branch was manually set, we should use it. Otherwise return None.
Expand Down
52 changes: 26 additions & 26 deletions metadata-ingestion/src/datahub/configuration/time_window_config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import enum
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List
from typing import Any, List

import humanfriendly
import pydantic
from pydantic.fields import Field
from pydantic import Field, ValidationInfo, field_validator, model_validator

from datahub.configuration.common import ConfigModel
from datahub.configuration.datetimes import parse_absolute_time, parse_relative_timespan
Expand Down Expand Up @@ -52,45 +51,46 @@ class BaseTimeWindowConfig(ConfigModel):
description="Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'.",
) # type: ignore

@pydantic.validator("start_time", pre=True, always=True)
def default_start_time(
cls, v: Any, values: Dict[str, Any], **kwargs: Any
) -> datetime:
if v is None:
return get_time_bucket(
values["end_time"]
- get_bucket_duration_delta(values["bucket_duration"]),
values["bucket_duration"],
)
elif isinstance(v, str):
@field_validator("start_time", mode="before")
@classmethod
def parse_start_time(cls, v: Any, info: ValidationInfo) -> Any:
if isinstance(v, str):
# This is where start_time str is resolved to datetime
try:
delta = parse_relative_timespan(v)
assert delta < timedelta(0), (
"Relative start time should start with minus sign (-) e.g. '-2 days'."
)
assert abs(delta) >= get_bucket_duration_delta(
values["bucket_duration"]
), (
bucket_duration = info.data.get("bucket_duration", BucketDuration.DAY)
assert abs(delta) >= get_bucket_duration_delta(bucket_duration), (
"Relative start time should be in terms of configured bucket duration. e.g '-2 days' or '-2 hours'."
)
Comment on lines +65 to 67
Copy link

@aikido-pr-checks aikido-pr-checks bot Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dangerous use of assert - low severity
When running Python in production in optimized mode, assert calls are not executed. This mode is enabled by setting the PYTHONOPTIMIZE command line flag. Optimized mode is usually ON in production. Any safety check done using assert will not be executed.

Remediation: Raise an exception instead of using assert.
View details in Aikido Security


# The end_time's default value is not yet populated, in which case
# we can just manually generate it here.
if "end_time" not in values:
values["end_time"] = datetime.now(tz=timezone.utc)
# We need end_time, but it might not be set yet
# In that case, we'll use the default
end_time = info.data.get("end_time")
if end_time is None:
end_time = datetime.now(tz=timezone.utc)

return get_time_bucket(
values["end_time"] + delta, values["bucket_duration"]
)
return get_time_bucket(end_time + delta, bucket_duration)
except humanfriendly.InvalidTimespan:
# We do not floor start_time to the bucket start time if absolute start time is specified.
# If user has specified absolute start time in recipe, it's most likely that he means it.
return parse_absolute_time(v)

return v

@pydantic.validator("start_time", "end_time")
@model_validator(mode="after")
def default_start_time(self) -> "BaseTimeWindowConfig":
# Only calculate start_time if it was None (not provided by user)
if self.start_time is None:
self.start_time = get_time_bucket(
self.end_time - get_bucket_duration_delta(self.bucket_duration),
self.bucket_duration,
)
return self

@field_validator("start_time", "end_time")
@classmethod
def ensure_timestamps_in_utc(cls, v: datetime) -> datetime:
if v.tzinfo is None:
raise ValueError(
Expand Down
Loading
Loading