Skip to content
3 changes: 1 addition & 2 deletions cognite_toolkit/_cdf_tk/apps/_migrate_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.command("timeseries")(self.timeseries)
self.command("files")(self.files)
self.command("canvas")(self.canvas)
# Uncomment when infield v2 config migration is ready
# self.command("infield-configs")(self.infield_configs)
self.command("infield-configs")(self.infield_configs)

def main(self, ctx: typer.Context) -> None:
"""Migrate resources from Asset-Centric to data modeling in CDF."""
Expand Down
54 changes: 52 additions & 2 deletions cognite_toolkit/_cdf_tk/commands/_migrate/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,65 @@ def create(
verbose: bool = False,
) -> DeployResults:
"""This method is used to create migration resource in CDF."""
self.validate_migration_model_available(client)
# Only validate migration model if the creator uses lineage/mapping
# InfieldV2ConfigCreator doesn't use the migration model, so skip validation
if creator.HAS_LINEAGE:
self.validate_migration_model_available(client)

deploy_cmd = DeployCommand(self.print_warning, silent=self.silent)
deploy_cmd.tracker = self.tracker

results = DeployResults([], "deploy", dry_run=dry_run)

# Special handling for InfieldV2ConfigCreator which creates two different resource types
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true, do you expect APM_Config to turn into location filter(s) and a special Infield v2 configuration?

from cognite_toolkit._cdf_tk.commands._migrate.creators import InfieldV2ConfigCreator
from cognite_toolkit._cdf_tk.cruds import LocationFilterCRUD

if isinstance(creator, InfieldV2ConfigCreator):
# Deploy LocationFilters using Location Filters API
location_filters = creator.create_location_filters()
if location_filters:
location_filter_crud = LocationFilterCRUD.create_loader(client)
location_filter_worker = ResourceWorker(location_filter_crud, "deploy")
location_filter_by_id = {
location_filter_crud.get_id(item): (item.dump(), item) for item in location_filters
}
location_filter_worker.validate_access(location_filter_by_id, is_dry_run=dry_run)
cdf_location_filters = location_filter_crud.retrieve(list(location_filter_by_id.keys()))
location_filter_resources = location_filter_worker.categorize_resources(
location_filter_by_id, cdf_location_filters, False, verbose
)

if dry_run:
location_filter_result = deploy_cmd.dry_run_deploy(
location_filter_resources, location_filter_crud, False, False
)
else:
location_filter_result = deploy_cmd.actual_deploy(location_filter_resources, location_filter_crud)

verb = "Would deploy" if dry_run else "Deploying"
self.console(f"{verb} {len(location_filters)} location filters to CDF.")

location_filter_configs = creator.location_filter_configs(location_filters)
for config in location_filter_configs:
filepath = (
output_dir
/ location_filter_crud.folder_name
/ f"{sanitize_filename(config.filestem)}.{location_filter_crud.kind}.yaml"
)
filepath.parent.mkdir(parents=True, exist_ok=True)
safe_write(filepath, yaml_safe_dump(config.data))
self.console(
f"{len(location_filter_configs)} {location_filter_crud.kind} resource configurations written to {(output_dir / location_filter_crud.folder_name).as_posix()!r}"
)

if location_filter_result:
results[location_filter_result.name] = location_filter_result

# Deploy InFieldLocationConfig nodes using Data Modeling Instance API
crud_cls = creator.CRUD
resource_list = creator.create_resources()

results = DeployResults([], "deploy", dry_run=dry_run)
crud = crud_cls.create_loader(client)
worker = ResourceWorker(crud, "deploy")
local_by_id = {crud.get_id(item): (item.dump(), item) for item in resource_list}
Expand Down
80 changes: 73 additions & 7 deletions cognite_toolkit/_cdf_tk/commands/_migrate/creators.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

from cognite_toolkit._cdf_tk.client import ToolkitClient
from cognite_toolkit._cdf_tk.client.data_classes.apm_config_v1 import APMConfig, APMConfigList
from cognite_toolkit._cdf_tk.cruds import NodeCRUD, ResourceCRUD, SpaceCRUD
from cognite_toolkit._cdf_tk.client.data_classes.location_filters import LocationFilterWriteList
from cognite_toolkit._cdf_tk.cruds import LocationFilterCRUD, NodeCRUD, ResourceCRUD, SpaceCRUD
from cognite_toolkit._cdf_tk.exceptions import ToolkitRequiredValueError
from cognite_toolkit._cdf_tk.utils import humanize_collection

from .data_model import CREATED_SOURCE_SYSTEM_VIEW_ID, SPACE, SPACE_SOURCE_VIEW_ID
from .infield_config import create_infield_v2_config


@dataclass
Expand Down Expand Up @@ -215,17 +217,81 @@ class InfieldV2ConfigCreator(MigrationCreator[NodeApplyList]):
HAS_LINEAGE = False

def create_resources(self) -> NodeApplyList:
"""Create InFieldLocationConfig nodes (LocationFilters are handled separately)."""
apm_config_nodes = self.client.data_modeling.instances.list(instance_type="node", sources=APMConfig.view_id)
apm_config = APMConfigList.from_nodes(apm_config_nodes)

new_config_nodes = NodeApplyList([])
# Filter configs: prefer APP_CONFIG_V2, fallback to default-config, otherwise skip
config_to_migrate = None
for config in apm_config:
new_config = self._create_infield_v2_config(config)
new_config_nodes.append(new_config)
return new_config_nodes
if config.external_id == "APP_CONFIG_V2":
config_to_migrate = config
break
if config_to_migrate is None:
for config in apm_config:
if config.external_id == "default-config":
config_to_migrate = config
break

if config_to_migrate is None:
return NodeApplyList([])

if not config_to_migrate.feature_configuration or not config_to_migrate.feature_configuration.root_location_configurations:
return NodeApplyList([])

feature_config = config_to_migrate.feature_configuration
root_location_configs = feature_config.root_location_configurations or []
# Convert feature_config to dict for disciplines and dataExplorationConfig migration
feature_config_dict = feature_config.dump(camel_case=True) if hasattr(feature_config, "dump") else None
migration_result = create_infield_v2_config(
root_location_configs,
feature_configuration=feature_config_dict,
config_external_id=config_to_migrate.external_id,
client=self.client,
)
return migration_result.all_nodes()

def create_location_filters(self) -> LocationFilterWriteList:
"""Create LocationFilter resources (to be deployed via Location Filters API)."""
apm_config_nodes = self.client.data_modeling.instances.list(instance_type="node", sources=APMConfig.view_id)
apm_config = APMConfigList.from_nodes(apm_config_nodes)

# Filter configs: prefer APP_CONFIG_V2, fallback to default-config, otherwise skip
config_to_migrate = None
for config in apm_config:
if config.external_id == "APP_CONFIG_V2":
config_to_migrate = config
break
if config_to_migrate is None:
for config in apm_config:
if config.external_id == "default-config":
config_to_migrate = config
break

if config_to_migrate is None:
return LocationFilterWriteList([])

if not config_to_migrate.feature_configuration or not config_to_migrate.feature_configuration.root_location_configurations:
return LocationFilterWriteList([])

feature_config = config_to_migrate.feature_configuration
root_location_configs = feature_config.root_location_configurations or []
# Convert feature_config to dict (not needed for location filters, but keep consistent)
feature_config_dict = feature_config.dump(camel_case=True) if hasattr(feature_config, "dump") else None
migration_result = create_infield_v2_config(
root_location_configs,
feature_configuration=feature_config_dict,
config_external_id=config_to_migrate.external_id,
client=self.client,
)
return migration_result.all_location_filters()

def resource_configs(self, resources: NodeApplyList) -> list[ResourceConfig]:
return [ResourceConfig(filestem=node.external_id, data=node.dump()) for node in resources]

def _create_infield_v2_config(self, config: APMConfig) -> NodeApply:
raise NotImplementedError("To be implemented")
def location_filter_configs(self, resources: LocationFilterWriteList) -> list[ResourceConfig]:
return [ResourceConfig(filestem=location_filter.external_id, data=location_filter.dump()) for location_filter in resources]

def store_lineage(self, resources: NodeApplyList) -> int:
# No lineage to store for Infield V2 configs
return 0
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""InField V2 configuration migration module.

This module handles migration from old APM Config format to InField V2 configuration format.
The migration is split into separate modules for better organization and maintainability.
"""

from .migration import create_infield_v2_config

__all__ = ["create_infield_v2_config"]

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Constants for InField V2 config migration."""

from cognite.client.data_classes.data_modeling import ViewId
from cognite.client.data_classes.data_modeling.ids import DataModelId

# View IDs for the new format
LOCATION_CONFIG_VIEW_ID = ViewId("infield_cdm_source_desc_sche_asset_file_ts", "InFieldLocationConfig", "v1")
DATA_EXPLORATION_CONFIG_VIEW_ID = ViewId("infield_cdm_source_desc_sche_asset_file_ts", "DataExplorationConfig", "v1")

# Target space for InFieldLocationConfig nodes
TARGET_SPACE = "APM_Config"

# Default data model for LocationFilter
DEFAULT_LOCATION_FILTER_DATA_MODEL = DataModelId(
space="infield_cdm_source_desc_sche_asset_file_ts",
external_id="InFieldOnCDM",
version="v1",
)

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Migration of DataExplorationConfig for InField V2 config migration.

This module handles the creation of a single DataExplorationConfig node per APM Config,
which is shared across all InFieldLocationConfig nodes via a direct relation.
"""

from typing import Any

from cognite.client.data_classes.data_modeling import NodeApply, NodeOrEdgeData

from .constants import DATA_EXPLORATION_CONFIG_VIEW_ID, TARGET_SPACE
from .types_new import DataExplorationConfigProperties


def create_data_exploration_config_node(
feature_configuration: dict[str, Any] | None,
config_external_id: str,
) -> NodeApply | None:
"""Create a DataExplorationConfig node from FeatureConfiguration.

Only one DataExplorationConfig is created per APM Config, shared across all locations.

Args:
feature_configuration: FeatureConfiguration dict from old APM Config
config_external_id: External ID of the APM Config (used to generate DataExplorationConfig external ID)

Returns:
NodeApply for DataExplorationConfig, or None if feature_configuration is missing or incomplete
"""
if not feature_configuration:
return None

# Extract properties from FeatureConfiguration
props: DataExplorationConfigProperties = {}

# Migrate observations
if observations := feature_configuration.get("observations"):
props["observations"] = observations

# Migrate activities
if activities := feature_configuration.get("activities"):
props["activities"] = activities

# Migrate documents
if documents := feature_configuration.get("documents"):
# Remove metadata. prefix from type and description if present
migrated_documents = documents.copy()
if "type" in migrated_documents and isinstance(migrated_documents["type"], str):
migrated_documents["type"] = migrated_documents["type"].removeprefix("metadata.")
if "description" in migrated_documents and isinstance(migrated_documents["description"], str):
migrated_documents["description"] = migrated_documents["description"].removeprefix("metadata.")
props["documents"] = migrated_documents

# Migrate notifications
if notifications := feature_configuration.get("notifications"):
props["notifications"] = notifications

# Migrate assets (from assetPageConfiguration)
if assets := feature_configuration.get("assetPageConfiguration"):
props["assets"] = assets

# Only create node if at least one property is present
if not props:
return None

# Generate external ID for DataExplorationConfig
data_exploration_external_id = f"data_exploration_{config_external_id}"

return NodeApply(
space=TARGET_SPACE,
external_id=data_exploration_external_id,
sources=[NodeOrEdgeData(source=DATA_EXPLORATION_CONFIG_VIEW_ID, properties=props)],
)


def get_data_exploration_config_external_id(config_external_id: str) -> str:
"""Generate external ID for DataExplorationConfig node.

Args:
config_external_id: External ID of the APM Config

Returns:
External ID for the DataExplorationConfig node
"""
return f"data_exploration_{config_external_id}"

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""InFieldLocationConfig field migration module."""

from .fields import apply_location_config_fields

__all__ = ["apply_location_config_fields"]

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Migration of accessManagement field for InFieldLocationConfig."""

from typing import Any

from ..types_new import AccessManagement


def migrate_access_management(location_dict: dict[str, Any]) -> AccessManagement | None:
"""Migrate accessManagement from old configuration.

Extracts templateAdmins and checklistAdmins from the old location configuration
and creates an AccessManagement dict.

Args:
location_dict: Location configuration dict (from dump(camel_case=True))

Returns:
AccessManagement dict, or None if neither templateAdmins nor checklistAdmins are present
"""
template_admins = location_dict.get("templateAdmins")
checklist_admins = location_dict.get("checklistAdmins")

# Only create accessManagement if at least one of the fields is present
if not template_admins and not checklist_admins:
return None

access_management: AccessManagement = {}
if template_admins:
access_management["templateAdmins"] = template_admins
if checklist_admins:
access_management["checklistAdmins"] = checklist_admins

return access_management

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Migration of appInstanceSpace field for InFieldLocationConfig."""

from typing import Any


def migrate_app_instance_space(location_dict: dict[str, Any]) -> str | None:
"""Migrate appInstanceSpace from appDataInstanceSpace.

Extracts the appDataInstanceSpace from the old configuration and returns it
as appInstanceSpace. If appDataInstanceSpace is not present or is an empty string,
returns None.

Args:
location_dict: Location configuration dict (from dump(camel_case=True))

Returns:
App instance space string, or None if not present or empty
"""
app_instance_space = location_dict.get("appDataInstanceSpace")
if not app_instance_space: # None or empty string
return None

return app_instance_space

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Migration of dataFilters field for InFieldLocationConfig."""

from typing import Any

from ..types_new import RootLocationDataFilters


def migrate_data_filters(location_dict: dict[str, Any]) -> RootLocationDataFilters | None:
"""Migrate dataFilters from old configuration.

Extracts the dataFilters dictionary from the old location configuration
and returns it as a RootLocationDataFilters dict. The structure matches
between old and new format, so it can be returned as-is.

Args:
location_dict: Location configuration dict (from dump(camel_case=True))

Returns:
RootLocationDataFilters dict, or None if dataFilters is not present
"""
data_filters = location_dict.get("dataFilters")
if not data_filters:
return None

# Return the data filters as-is (already in the correct format)
return data_filters

Loading