Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine gendata config directly #8554

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/ert/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,20 @@ async def _write_responses_to_storage(
for config in response_configs:
try:
start_time = time.perf_counter()
logger.debug(f"Starting to load response: {config.name}")
logger.debug(f"Starting to load response: {config.response_type}")
ds = config.read_from_file(run_arg.runpath, run_arg.iens)
await asyncio.sleep(0)
logger.debug(
f"Loaded {config.name}",
f"Loaded {config.response_type}",
extra={"Time": f"{(time.perf_counter() - start_time):.4f}s"},
)
start_time = time.perf_counter()
run_arg.ensemble_storage.save_response(config.name, ds, run_arg.iens)
run_arg.ensemble_storage.save_response(
config.response_type, ds, run_arg.iens
)
await asyncio.sleep(0)
logger.debug(
f"Saved {config.name} to storage",
f"Saved {config.response_type} to storage",
extra={"Time": f"{(time.perf_counter() - start_time):.4f}s"},
)
except ValueError as err:
Expand Down
105 changes: 32 additions & 73 deletions src/ert/config/ensemble_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,29 @@
import os
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from typing import (
Any,
Dict,
List,
Optional,
Sequence,
Union,
no_type_check,
overload,
)

import numpy as np
import numpy.typing as npt

from ert.field_utils import get_shape

from ._read_summary import read_summary
from .field import Field
from .gen_data_config import GenDataConfig
from .gen_kw_config import GenKwConfig
from .parameter_config import ParameterConfig
from .parsing import ConfigDict, ConfigKeys, ConfigValidationError
from .refcase import Refcase
from .response_config import ResponseConfig
from .summary_config import SummaryConfig
from .surface_config import SurfaceConfig

_KNOWN_RESPONSE_TYPES = [SummaryConfig, GenDataConfig]

logger = logging.getLogger(__name__)


Expand All @@ -50,28 +46,6 @@ def _get_abs_path(file: Optional[str]) -> Optional[str]:
return file


@dataclass(eq=False)
class Refcase:
start_date: datetime
keys: List[str]
dates: Sequence[datetime]
values: npt.NDArray[Any]

def __eq__(self, other: object) -> bool:
if not isinstance(other, Refcase):
return False
return bool(
self.start_date == other.start_date
and self.keys == other.keys
and self.dates == other.dates
and np.all(self.values == other.values)
)

@property
def all_dates(self) -> List[datetime]:
return [self.start_date] + list(self.dates)


@dataclass
class EnsembleConfig:
grid_file: Optional[str] = None
Expand All @@ -82,15 +56,17 @@ class EnsembleConfig:

def __post_init__(self) -> None:
self._check_for_duplicate_names(
list(self.parameter_configs.values()), list(self.response_configs.values())
[p.name for p in self.parameter_configs.values()],
[key for config in self.response_configs.values() for key in config.keys],
)

self.grid_file = _get_abs_path(self.grid_file)

@staticmethod
def _check_for_duplicate_names(
parameter_list: List[ParameterConfig], gen_data_list: List[ResponseConfig]
parameter_list: List[str], gen_data_list: List[str]
) -> None:
names_counter = Counter(g.name for g in parameter_list + gen_data_list)
names_counter = Counter(g for g in parameter_list + gen_data_list)
duplicate_names = [n for n, c in names_counter.items() if c > 1]
if duplicate_names:
raise ConfigValidationError.with_context(
Expand All @@ -104,8 +80,6 @@ def _check_for_duplicate_names(
@classmethod
def from_dict(cls, config_dict: ConfigDict) -> EnsembleConfig:
grid_file_path = config_dict.get(ConfigKeys.GRID)
refcase_file_path = config_dict.get(ConfigKeys.REFCASE)
gen_data_list = config_dict.get(ConfigKeys.GEN_DATA, [])
gen_kw_list = config_dict.get(ConfigKeys.GEN_KW, [])
surface_list = config_dict.get(ConfigKeys.SURFACE, [])
field_list = config_dict.get(ConfigKeys.FIELD, [])
Expand All @@ -132,48 +106,24 @@ def make_field(field_list: List[str]) -> Field:
)
return Field.from_config_list(grid_file_path, dims, field_list)

eclbase = config_dict.get("ECLBASE")
if eclbase is not None:
eclbase = eclbase.replace("%d", "<IENS>")
refcase_keys = []
time_map = []
data = None
if refcase_file_path is not None:
try:
start_date, refcase_keys, time_map, data = read_summary(
refcase_file_path, ["*"]
)
except Exception as err:
raise ConfigValidationError(f"Could not read refcase: {err}") from err
parameter_configs = (
[GenKwConfig.from_config_list(g) for g in gen_kw_list]
+ [SurfaceConfig.from_config_list(s) for s in surface_list]
+ [make_field(f) for f in field_list]
)

response_configs: List[ResponseConfig] = [
GenDataConfig.from_config_list(g) for g in gen_data_list
]
refcase = (
Refcase(start_date, refcase_keys, time_map, data)
if data is not None
else None
)
summary_keys = config_dict.get(ConfigKeys.SUMMARY, [])
if summary_keys:
if eclbase is None:
raise ConfigValidationError(
"In order to use summary responses, ECLBASE has to be set."
)
time_map = set(refcase.dates) if refcase is not None else None
response_configs.append(
SummaryConfig(
name="summary",
input_file=eclbase,
keys=[i for val in summary_keys for i in val],
refcase=time_map,
)
)
response_configs: List[ResponseConfig] = []

for config_cls in _KNOWN_RESPONSE_TYPES:
instance = config_cls.from_config_dict(config_dict)

if instance is not None and instance.keys:
response_configs.append(instance)

refcase = Refcase.from_config_dict(config_dict)
eclbase = config_dict.get("ECLBASE")
if eclbase is not None:
eclbase = eclbase.replace("%d", "<IENS>")

return cls(
grid_file=grid_file_path,
Expand All @@ -190,13 +140,22 @@ def __getitem__(self, key: str) -> Union[ParameterConfig, ResponseConfig]:
return self.parameter_configs[key]
elif key in self.response_configs:
return self.response_configs[key]
elif _config := next(
(c for c in self.response_configs.values() if key in c.keys), None
):
# Only hit by blockfs migration
# returns the same config for one call per
# response type. Is later deduped before saving to json
return _config
else:
raise KeyError(f"The key:{key} is not in the ensemble configuration")

def hasNodeGenData(self, key: str) -> bool:
return key in self.response_configs and isinstance(
self.response_configs[key], GenDataConfig
)
if "gen_data" not in self.response_configs:
return False

config = self.response_configs["gen_data"]
return key in config.keys

def addNode(self, config_node: Union[ParameterConfig, ResponseConfig]) -> None:
assert config_node is not None
Expand Down
Loading