From 8948757b656ffcc32b930192ae9aeb94bb8e207b Mon Sep 17 00:00:00 2001 From: Danilo Lessa Bernardineli Date: Thu, 21 Dec 2023 13:44:14 -0300 Subject: [PATCH] Add row_count test + refactors on the final results processing + test parametrizations (#336) * add tests for checking keys * add tests * fix issues #335 & #332 * parametrize test_runs * add more tests * fix subset ordering --- cadCAD/configuration/utils/__init__.py | 2 +- cadCAD/engine/__init__.py | 46 +++++++-- cadCAD/engine/execution.py | 61 ++++++------ cadCAD/types.py | 8 +- testing/test_row_count.py | 67 +++++++++++++ testing/test_runs.py | 117 ++++++++++++----------- testing/tests/cadCAD_memory_address.json | 2 +- 7 files changed, 203 insertions(+), 100 deletions(-) create mode 100644 testing/test_row_count.py diff --git a/cadCAD/configuration/utils/__init__.py b/cadCAD/configuration/utils/__init__.py index 7ebac29a..8359c1b9 100644 --- a/cadCAD/configuration/utils/__init__.py +++ b/cadCAD/configuration/utils/__init__.py @@ -104,7 +104,7 @@ def ep_decorator(f, y, var_dict, sub_step, sL, s, _input, **kwargs): else: return y, s[y] - return {es: ep_decorator(f, es) for es, f in ep.items()} + return {es: ep_decorator(f, es) for es, f in ep.items()} # type: ignore def trigger_condition(s, pre_conditions, cond_opp): diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 24d977d4..4dc33409 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -1,5 +1,5 @@ from time import time -from typing import Callable, Dict, List, Any, Tuple, Union +from typing import Callable, Dict, List, Any, Tuple, Union, Sequence, Mapping from tqdm.auto import tqdm from cadCAD.utils import flatten @@ -147,18 +147,50 @@ def get_final_results(simulations: List[StateHistory], eps, sessions: List[SessionDict], remote_threshold: int): + + # if list of lists of lists of dicts: do flatten + # if list of dicts: do not flatetn + # else raise error + + + init: bool = isinstance(simulations, Sequence) + failed_1 = False + failed_2 = False + + try: + init: bool = isinstance(simulations, Sequence) + dont_flatten = init & isinstance(simulations[0], Mapping) + do_flatten = not dont_flatten + except: + failed_1 = True + do_flatten = True + + try: + do_flatten = init & isinstance(simulations[0], Sequence) + do_flatten &= isinstance(simulations[0][0], Sequence) + do_flatten &= isinstance(simulations[0][0][0], Mapping) + except: + failed_2 = True + do_flatten = False + + if failed_1 and failed_2: + raise ValueError('Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])') + + flat_timesteps, tensor_fields = [], [] for sim_result, psu, ep in tqdm(list(zip(simulations, psus, eps)), total=len(simulations), desc='Flattening results'): - flat_timesteps.append(flatten(sim_result)) + if do_flatten: + flat_timesteps.append(flatten(sim_result)) tensor_fields.append(create_tensor_field(psu, ep)) + + if do_flatten: + flat_simulations = flatten(flat_timesteps) + else: + flat_simulations = simulations - flat_simulations = flatten(flat_timesteps) - if config_amt == 1: - return simulations, tensor_fields, sessions - elif config_amt > 1: - return flat_simulations, tensor_fields, sessions + return flat_simulations, tensor_fields, sessions final_result = None original_N = len(configs_as_dicts(self.configs)) diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index 4a26218f..97a5fa87 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -1,4 +1,4 @@ -from typing import Callable, Dict, List, Any, Tuple +from typing import Callable, Dict, List, Any, Tuple, Sequence from pathos.multiprocessing import ProcessPool # type: ignore from collections import Counter from cadCAD.types import * @@ -11,41 +11,38 @@ def single_proc_exec( - simulation_execs: List[ExecutorFunction], - var_dict_list: List[Parameters], - states_lists: List[StateHistory], - configs_structs: List[StateUpdateBlocks], - env_processes_list: List[EnvProcesses], - Ts: List[TimeSeq], - SimIDs: List[SimulationID], - Ns: List[Run], - ExpIDs: List[int], - SubsetIDs: List[SubsetID], - SubsetWindows: List[SubsetWindow], - configured_n: List[N_Runs], + simulation_execs: Sequence[ExecutorFunction], + var_dict_list: Union[Sequence[Parameters], Parameters], + states_lists: Sequence[StateHistory], + configs_structs: Sequence[StateUpdateBlocks], + env_processes_list: Sequence[EnvProcesses], + Ts: Sequence[TimeSeq], + SimIDs: Sequence[SimulationID], + Ns: Sequence[Run], + ExpIDs: Sequence[int], + SubsetIDs: Sequence[SubsetID], + SubsetWindows: Sequence[SubsetWindow], + configured_n: Sequence[N_Runs], additional_objs=None -): +) -> List: + - # HACK for making it run with N_Runs=1 - if type(var_dict_list) == list: - var_dict_list = var_dict_list[0] + if not isinstance(var_dict_list, Sequence): + var_dict_list = list([var_dict_list]) - print(f'Execution Mode: single_threaded') - raw_params: List[List] = [ + raw_params = ( simulation_execs, states_lists, configs_structs, env_processes_list, - Ts, SimIDs, Ns, SubsetIDs, SubsetWindows - ] - simulation_exec, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window = list( - map(lambda x: x.pop(), raw_params) - ) - result = simulation_exec( - var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n, additional_objs - ) - return flatten(result) - - - - + Ts, SimIDs, Ns, SubsetIDs, SubsetWindows, var_dict_list) + + results: List = [] + print(f'Execution Mode: single_threaded') + for raw_param in zip(*raw_params): + simulation_exec, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, var_dict = raw_param + result = simulation_exec( + var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n, additional_objs + ) + results.append(flatten(result)) + return flatten(results) def parallelize_simulations( simulation_execs: List[ExecutorFunction], diff --git a/cadCAD/types.py b/cadCAD/types.py index d5b9302f..d7bf731a 100644 --- a/cadCAD/types.py +++ b/cadCAD/types.py @@ -1,4 +1,4 @@ -from typing import TypedDict, Callable, Union, Dict, List, Tuple, Iterator +from typing import TypedDict, Callable, Union, Dict, List, Tuple, Iterable from collections import deque State = Dict[str, object] @@ -20,18 +20,18 @@ class StateUpdateBlock(TypedDict): StateUpdateBlocks = List[StateUpdateBlock] class ConfigurationDict(TypedDict): - T: Iterator # Generator for the timestep variable + T: Iterable # Generator for the timestep variable N: int # Number of MC Runs M: Union[Parameters, SweepableParameters] # Parameters / List of Parameter to Sweep TargetValue = object EnvProcess: Callable[[State, SweepableParameters, TargetValue], TargetValue] EnvProcesses = Dict[str, Callable] -TimeSeq = Iterator +TimeSeq = Iterable SimulationID = int Run = int SubsetID = int -SubsetWindow = Iterator +SubsetWindow = Iterable N_Runs = int ExecutorFunction = Callable[[Parameters, StateHistory, StateUpdateBlocks, EnvProcesses, TimeSeq, SimulationID, Run, SubsetID, SubsetWindow, N_Runs], object] diff --git a/testing/test_row_count.py b/testing/test_row_count.py new file mode 100644 index 00000000..a1d78f14 --- /dev/null +++ b/testing/test_row_count.py @@ -0,0 +1,67 @@ +from cadCAD.configuration import Experiment +from cadCAD.configuration.utils import config_sim +from cadCAD.engine import Executor, ExecutionContext, ExecutionMode +import pytest + + +CONFIG_SIGNATURES_TO_TEST = [(3, 3, 3, 3, 3), (1, 3, 3, 3, 3), + (3, 1, 3, 3, 3), (1, 1, 3, 3, 3), + (3, 3, 1, 3, 3), (1, 3, 1, 3, 3), (1, 1, 1, 3, 3)] + +def run_experiment(exp: Experiment, mode: str): + exec_context = ExecutionContext(mode) + executor = Executor(exec_context=exec_context, configs=exp.configs) + (records, tensor_field, _) = executor.execute() + return records + + +def create_experiments(N_simulations=3, N_sweeps=3, N_runs=3, N_timesteps=3, N_substeps=3) -> Experiment: + + INITIAL_STATE = {'varA': None} + PSUBs = [{'policies': {}, 'variables': {}}] * N_substeps + params = {'A': [None] * N_sweeps, + 'B': [None]} + + SIM_CONFIG = config_sim( + { + "N": N_runs, + "T": range(N_timesteps), + "M": params, # Optional + } + ) + + exp = Experiment() + for i_sim in range(N_simulations): + exp.append_model( + sim_configs=SIM_CONFIG, + initial_state=INITIAL_STATE, + partial_state_update_blocks=PSUBs + ) + return exp + + +def expected_rows(N_simulations, N_sweeps, N_runs, N_timesteps, N_substeps) -> int: + return N_simulations * N_sweeps * N_runs * (N_timesteps * N_substeps + 1) + + + +@pytest.mark.parametrize("N_sim,N_sw,N_r,N_t,N_s", CONFIG_SIGNATURES_TO_TEST) +def test_row_count_single(N_sim, N_sw, N_r, N_t, N_s): + args = (N_sim, N_sw, N_r, N_t, N_s) + assert len(run_experiment(create_experiments(*args), 'single_proc')) == expected_rows(*args) + + +@pytest.mark.parametrize("N_sim,N_sw,N_r,N_t,N_s", CONFIG_SIGNATURES_TO_TEST) +def test_row_count_multi(N_sim, N_sw, N_r, N_t, N_s): + args = (N_sim, N_sw, N_r, N_t, N_s) + + if N_sim == 1 and N_sw == 1 and N_r == 1: + with pytest.raises(ValueError) as e_info: + assert len(run_experiment(create_experiments(*args), 'multi_proc')) == expected_rows(*args) + else: + assert len(run_experiment(create_experiments(*args), 'multi_proc')) == expected_rows(*args) + +@pytest.mark.parametrize("N_sim,N_sw,N_r,N_t,N_s", CONFIG_SIGNATURES_TO_TEST) +def test_row_count_local(N_sim, N_sw, N_r, N_t, N_s): + args = (N_sim, N_sw, N_r, N_t, N_s) + assert len(run_experiment(create_experiments(*args), 'local_proc')) == expected_rows(*args) diff --git a/testing/test_runs.py b/testing/test_runs.py index 2c33c2b4..69d1b149 100644 --- a/testing/test_runs.py +++ b/testing/test_runs.py @@ -1,12 +1,14 @@ -from typing import Dict, List +from typing import Dict, List, Optional from cadCAD.engine import Executor, ExecutionContext, ExecutionMode from cadCAD.configuration import Experiment from cadCAD.configuration.utils import env_trigger, var_substep_trigger, config_sim, psub_list from cadCAD.types import * -import pandas as pd # type: ignore +import pandas as pd # type: ignore import types import inspect import pytest +from pandas import DataFrame + def describe_or_return(v: object) -> object: """ @@ -55,65 +57,59 @@ def assign_params(_df: pd.DataFrame, configs) -> pd.DataFrame: df = _df.assign(**first_param_dict).copy() for i, (_, subset_df) in enumerate(df.groupby(['simulation', 'subset', 'run'])): df.loc[subset_df.index] = subset_df.assign(**select_config_M_dict(configs, - i, - selected_params)) + i, + selected_params)) return df - - SWEEP_PARAMS: Dict[str, List] = { - 'alpha': [1], - 'beta': [lambda x: 2 * x, lambda x: x], - 'gamma': [3, 4], - 'omega': [7] - } + 'alpha': [1], + 'beta': [lambda x: 2 * x, lambda x: x, lambda x: x / 2], + 'gamma': [3, 4, 5], + 'omega': [7] +} SINGLE_PARAMS: Dict[str, object] = { - 'alpha': 1, - 'beta': lambda x: x, - 'gamma': 3, - 'omega': 5 - } + 'alpha': 1, + 'beta': lambda x: x, + 'gamma': 3, + 'omega': 5 +} -def create_experiment(N_RUNS=2, N_TIMESTEPS=3, params: dict=SWEEP_PARAMS): +def create_experiment(N_RUNS=2, N_TIMESTEPS=3, params: dict = SWEEP_PARAMS): psu_steps = ['m1', 'm2', 'm3'] system_substeps = len(psu_steps) var_timestep_trigger = var_substep_trigger([0, system_substeps]) env_timestep_trigger = env_trigger(system_substeps) env_process = {} - # ['s1', 's2', 's3', 's4'] # Policies per Mechanism + def gamma(params: Parameters, substep: Substep, history: StateHistory, state: State, **kwargs): return {'gamma': params['gamma']} - def omega(params: Parameters, substep: Substep, history: StateHistory, state: State, **kwarg): return {'omega': params['omega']} - # Internal States per Mechanism + def alpha(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): return 'alpha_var', params['alpha'] - def beta(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): return 'beta_var', params['beta'] - + def gamma_var(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): return 'gamma_var', params['gamma'] - + def omega_var(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): return 'omega_var', params['omega'] - def policies(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): return 'policies', _input - def sweeped(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): return 'sweeped', {'beta': params['beta'], 'gamma': params['gamma']} @@ -126,8 +122,8 @@ def sweeped(params: Parameters, substep: Substep, history: StateHistory, state: psu_block[m]["states"]['gamma_var'] = gamma_var psu_block[m]["states"]['omega_var'] = omega_var psu_block[m]['states']['policies'] = policies - psu_block[m]["states"]['sweeped'] = var_timestep_trigger(y='sweeped', f=sweeped) - + psu_block[m]["states"]['sweeped'] = var_timestep_trigger( + y='sweeped', f=sweeped) # Genesis States genesis_states = { @@ -140,13 +136,14 @@ def sweeped(params: Parameters, substep: Substep, history: StateHistory, state: } # Environment Process - env_process['sweeped'] = env_timestep_trigger(trigger_field='timestep', trigger_vals=[5], funct_list=[lambda _g, x: _g['beta']]) + env_process['sweeped'] = env_timestep_trigger(trigger_field='timestep', trigger_vals=[ + 5], funct_list=[lambda _g, x: _g['beta']]) sim_config = config_sim( { "N": N_RUNS, "T": range(N_TIMESTEPS), - "M": params, # Optional + "M": params, # Optional } ) @@ -162,49 +159,59 @@ def sweeped(params: Parameters, substep: Substep, history: StateHistory, state: ) return exp - -def test_mc_sweep_experiment(): - experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.local_mode) - experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.single_mode) - experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.multi_mode) - -def test_unique_sweep_experiment(): - experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.local_mode) - experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.single_mode) - experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.multi_mode) - -def test_mc_single_experiment(): - experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.local_mode) - experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.single_mode) - experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.multi_mode) - -def test_unique_single_experiment(): - experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.local_mode) - experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.single_mode) - with pytest.raises(ValueError) as e_info: - experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.multi_mode) - +@pytest.mark.parametrize("mode", ["local_proc", "single_proc", "multi_proc"]) +def test_mc_sweep_experiment(mode): + experiment_assertions(create_experiment( + N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), mode) + +@pytest.mark.parametrize("mode", ["local_proc", "single_proc", "multi_proc"]) +def test_unique_sweep_experiment(mode): + experiment_assertions(create_experiment( + N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), mode) + +@pytest.mark.parametrize("mode", ["local_proc", "single_proc", "multi_proc"]) +def test_mc_single_experiment(mode): + experiment_assertions(create_experiment( + N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), mode) + +@pytest.mark.parametrize("mode", ["local_proc", "single_proc", "multi_proc"]) +def test_unique_single_experiment(mode): + if mode == "multi_proc": + with pytest.raises(ValueError) as e_info: + experiment_assertions(create_experiment( + N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), mode) + else: + experiment_assertions(create_experiment( + N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), mode) -def experiment_assertions(exp, mode=None): +def experiment_assertions(exp: Experiment, mode: Optional[str]=None) -> None: if mode == None: mode = ExecutionMode().local_mode exec_context = ExecutionContext(mode) executor = Executor(exec_context=exec_context, configs=exp.configs) (records, tensor_field, _) = executor.execute() - df = drop_substeps(assign_params(pd.DataFrame(records), exp.configs)) + + df: DataFrame = assign_params(pd.DataFrame(records), exp.configs) + df = drop_substeps(df) # XXX: parameters should always be of the same type. Else, the test will fail first_sim_config = exp.configs[0].sim_config['M'] + required_keys = {'simulation': int, + 'run': int, + 'subset': int, + 'timestep': int} for (i, row) in df.iterrows(): if row.timestep > 0: - + assert row['alpha_var'] == row['alpha'] assert type(row['alpha_var']) == type(first_sim_config['alpha']) assert row['gamma_var'] == row['gamma'] assert type(row['gamma_var']) == type(first_sim_config['gamma']) assert row['omega_var'] == row['omega'] assert type(row['omega_var']) == type(first_sim_config['omega']) - + for k, v in required_keys.items(): + assert k in row + assert type(row[k]) == v diff --git a/testing/tests/cadCAD_memory_address.json b/testing/tests/cadCAD_memory_address.json index 859ec289..c60ead7b 100644 --- a/testing/tests/cadCAD_memory_address.json +++ b/testing/tests/cadCAD_memory_address.json @@ -1 +1 @@ -{"memory_address": "0x10fbedd50"} \ No newline at end of file +{"memory_address": "0x111857380"} \ No newline at end of file