From 93d63259a62eb377fb93af822adf1524c983d01a Mon Sep 17 00:00:00 2001 From: simon-p-2000 Date: Thu, 13 Feb 2025 11:23:18 +0100 Subject: [PATCH] Activity count feature addition (#249) --- CHANGELOG.md | 1 + docs/datasets.md | 4 +- src/sleepecg/io/sleep_readers.py | 150 ++++++++++++++++++++++++++++++- tests/test_sleep_readers.py | 103 ++++++++++++++++++++- 4 files changed, 253 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a37478a1..ae169f8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## [0.5.9] - 2025-02-01 ### Added - Add support to store NSRR token in environment variable or user config ([#243](https://github.com/cbrnr/sleepecg/pull/243) by [Simon Pusterhofer](https://github.com/simon-p-2000)) +- Add support for downloading and storing activity counts for the MESA dataset ([#249](https://github.com/cbrnr/sleepecg/pull/249) by [Simon Pusterhofer](https://github.com/simon-p-2000)) - Add Python 3.13+ support by transforming wheel builds using ABI3 mode ([#251](https://github.com/cbrnr/sleepecg/pull/251) by [Eric Larson](https://github.com/larsoner)) ### Changed diff --git a/docs/datasets.md b/docs/datasets.md index 5bf5c402..3c9b8620 100644 --- a/docs/datasets.md +++ b/docs/datasets.md @@ -44,7 +44,7 @@ Instead of always using [`set_nsrr_token()`](sleepecg.set_nsrr_token), you can s SleepECG checks for the NSRR token in the following order: -1. Token set via [`set_nsrr_token()`][sleepecg.set_nsrr_token] +1. Token set via [`set_nsrr_token()`](sleepecg.set_nsrr_token) 2. Token set via environment variable `NSRR_TOKEN` 3. Token set in the user configuration @@ -59,6 +59,8 @@ set_nsrr_token("") mesa = read_mesa(records_pattern="00*") # note that this is a generator ``` +SleepECG supports downloading and storing activity counts for the MESA dataset. These metrics quantify a subject's movement based on accelerometer measurements recorded and processed using a proprietary algorithm in Philips Actiware. To access activity counts, call [`read_mesa()`](sleepecg.read_mesa) with `activity_source='actigraphy'` to download the data or `activity_source='cached'` to use previously downloaded counts. + !!! note Reader functions are generators, so they do not return the data directly. To access the data, you need to consume the generator, either by iterating over it or with subsequent calls of `next()`. diff --git a/src/sleepecg/io/sleep_readers.py b/src/sleepecg/io/sleep_readers.py index 5fc2a743..714eb5c6 100644 --- a/src/sleepecg/io/sleep_readers.py +++ b/src/sleepecg/io/sleep_readers.py @@ -8,6 +8,7 @@ import csv import datetime +import os from collections.abc import Iterator from dataclasses import dataclass from enum import IntEnum @@ -86,6 +87,8 @@ class SleepRecord: Times of heartbeats relative to recording start in seconds, by default `None`. subject_data : SubjectData, optional Dataclass containing subject data (such as gender or age), by default `None`. + activity_counts: np.ndarray, optional + Activity counts according to Actiwatch actigraphy, by default `None`. """ sleep_stages: np.ndarray | None = None @@ -94,12 +97,14 @@ class SleepRecord: recording_start_time: datetime.time | None = None heartbeat_times: np.ndarray | None = None subject_data: SubjectData | None = None + activity_counts: np.ndarray | None = None class _ParseNsrrXmlResult(NamedTuple): sleep_stages: np.ndarray sleep_stage_duration: int recording_start_time: datetime.time + recording_duration: float def _parse_nsrr_xml(xml_filepath: Path) -> _ParseNsrrXmlResult: @@ -120,6 +125,8 @@ def _parse_nsrr_xml(xml_filepath: Path) -> _ParseNsrrXmlResult: Duration of each sleep stage in seconds. recording_start_time : datetime.time Time at which the recording was started. + recording_duration: float + Duration of the recording in seconds. """ STAGE_MAPPING = { @@ -139,6 +146,12 @@ def _parse_nsrr_xml(xml_filepath: Path) -> _ParseNsrrXmlResult: raise RuntimeError(f"EpochLength not found in {xml_filepath}.") epoch_length = int(epoch_length) + scored_event = root.find(".//ScoredEvent") + if scored_event is not None: + recording_duration = float(scored_event.findtext(".//Duration", "")) + else: + raise RuntimeError(f"Recording duration not found in {xml_filepath}.") + start_time = None annot_stages = [] @@ -157,9 +170,7 @@ def _parse_nsrr_xml(xml_filepath: Path) -> _ParseNsrrXmlResult: raise RuntimeError(f"'Recording Start Time' not found in {xml_filepath}.") return _ParseNsrrXmlResult( - np.array(annot_stages, dtype=np.int8), - epoch_length, - start_time, + np.array(annot_stages, dtype=np.int8), epoch_length, start_time, recording_duration ) @@ -169,6 +180,7 @@ def read_mesa( offline: bool = False, keep_edfs: bool = False, data_dir: str | Path | None = None, + activity_source: str | None = None, ) -> Iterator[SleepRecord]: """ Lazily read records from [MESA](https://sleepdata.org/datasets/mesa). @@ -197,6 +209,10 @@ def read_mesa( data_dir : str | pathlib.Path, optional Directory where all datasets are stored. If `None` (default), the value will be taken from the configuration. + activity_source : {'actigraphy', 'cached', None}, optional + If `None` (default), actigraphy data will not be downloaded. If `'actigraphy'`, + download actigraphy data from MESA dataset. If `'cached'`, get the cached activity + counts. Yields ------ @@ -208,7 +224,10 @@ def read_mesa( DB_SLUG = "mesa" ANNOTATION_DIRNAME = "polysomnography/annotations-events-nsrr" EDF_DIRNAME = "polysomnography/edfs" + ACTIVITY_DIRNAME = "actigraphy" + OVERLAP_DIRNAME = "overlap" HEARTBEATS_DIRNAME = "preprocessed/heartbeats" + ACTIVITY_COUNTS_DIRNAME = "preprocessed/activity_counts" RPOINTS_DIRNAME = "polysomnography/annotations-rpoints" GENDER_MAPPING = {0: Gender.FEMALE, 1: Gender.MALE} @@ -220,6 +239,13 @@ def read_mesa( f"possible options: {heartbeats_source_options}" ) + activity_source_options = {None, "cached", "actigraphy"} + if activity_source not in activity_source_options: + raise ValueError( + f"Invalid value for parameter `activity_source`: {activity_source}, " + f"possible options: {activity_source_options}" + ) + if data_dir is None: data_dir = get_config_value("data_dir") @@ -231,6 +257,13 @@ def read_mesa( for directory in (annotations_dir, edf_dir, heartbeats_dir): directory.mkdir(parents=True, exist_ok=True) + if activity_source is not None: + activity_dir = db_dir / ACTIVITY_DIRNAME + activity_counts_dir = db_dir / ACTIVITY_COUNTS_DIRNAME + overlap_dir = db_dir / OVERLAP_DIRNAME + for directory in (activity_dir, activity_counts_dir, overlap_dir): + directory.mkdir(parents=True, exist_ok=True) + if not offline: download_url = _get_nsrr_url(DB_SLUG) @@ -271,10 +304,37 @@ def read_mesa( shallow=True, ) checksums.update(rpoints_files) + + if activity_source is not None: + activity_files = _list_nsrr( + db_slug=DB_SLUG, + subfolder="actigraphy", + pattern=f"mesa-sleep-{records_pattern}.csv", + shallow=True, + ) + checksums.update(activity_files) + overlap_filename, overlap_checksum = _list_nsrr( + db_slug="mesa", subfolder="overlap", shallow=True + )[0] + overlap_filepath = db_dir / overlap_filename + _download_nsrr_file( + download_url + overlap_filename, + target_filepath=overlap_filepath, + checksum=overlap_checksum, + ) + else: subject_data_filepath = next((db_dir / "datasets").glob("mesa-sleep-dataset-*.csv")) xml_paths = annotations_dir.glob(f"mesa-sleep-{records_pattern}-nsrr.xml") requested_records = sorted([file.stem[:-5] for file in xml_paths]) + if activity_source == "actigraphy": + overlap_filename = "mesa-actigraphy-psg-overlap.csv" + overlap_filepath = overlap_dir / overlap_filename + if not overlap_filepath.is_file(): + raise RuntimeError( + "Overlap file not found, make sure it is in the correct directory " + "overlap/mesa-actigraphy-psg-overlap.csv." + ) subject_data_array = np.loadtxt( subject_data_filepath, @@ -291,6 +351,16 @@ def read_mesa( age=age, ) + if activity_source == "actigraphy": + overlap_data = {} + + with open(overlap_filepath) as csv_file: + overlap_reader = csv.DictReader(csv_file, delimiter=",") + for entry in overlap_reader: + mesaid = int(entry["mesaid"]) + line = int(entry["line"]) + overlap_data[mesaid] = line + for record_id in requested_records: heartbeats_file = heartbeats_dir / f"{record_id}.npy" if heartbeats_source == "annotation": @@ -350,6 +420,79 @@ def read_mesa( parsed_xml = _parse_nsrr_xml(xml_filepath) + activity_counts = None + if activity_source is not None: + activity_counts_file = activity_counts_dir / f"{record_id}-activity-counts.npy" + if activity_source == "cached": + if not activity_counts_file.is_file(): + print(f"Skipping {record_id} due to missing cached activity counts.") + continue + activity_counts = np.load(activity_counts_file) + else: + mesaid = int(record_id.split("-")[2]) + if mesaid not in overlap_data: + print(f"Skipping {record_id} due to missing overlap data.") + continue + + activity_filename = ACTIVITY_DIRNAME + f"/{record_id}.csv" + activity_filepath = db_dir / activity_filename + if not offline: + _download_nsrr_file( + download_url + activity_filename, + activity_filepath, + checksums[activity_filename], + ) + + if not os.path.exists(activity_filepath): + print(f"Skipping {record_id} due to missing activity data.") + continue + + activity_data = [] + + with open(activity_filepath) as csv_file: + reader = csv.reader(csv_file, delimiter=",") + header = next(reader) + for row in reader: + activity_data.append(dict(zip(header, row))) + + recording_start_time = parsed_xml.recording_start_time + recording_duration = parsed_xml.recording_duration + recording_end_time = datetime.datetime.combine( + datetime.datetime.today(), recording_start_time + ) + datetime.timedelta(seconds=recording_duration) + + recording_end_time_seconds = recording_end_time.second + rounding_seconds = ( + (30 - recording_end_time_seconds % 30) + if recording_end_time_seconds % 30 >= 15 + else -(recording_end_time_seconds % 30) + ) + + recording_end_time = recording_end_time + datetime.timedelta( + seconds=rounding_seconds + ) + recording_end_time_str = recording_end_time.strftime("%H:%M:%S").lstrip("0") + + start_line = overlap_data[mesaid] + 1 + + for item in activity_data: + if item.get("linetime") == recording_end_time_str: + end_line = int(item["line"]) - 1 + break + else: + print( + f"Skipping {record_id} due to missing line matching " + f"{recording_end_time_str}." + ) + continue + + activity_counts = [ + item["activity"] for item in activity_data[start_line - 1 : end_line] + ] + + activity_counts = np.array(activity_counts) + np.save(activity_counts_file, activity_counts) + yield SleepRecord( sleep_stages=parsed_xml.sleep_stages, sleep_stage_duration=parsed_xml.sleep_stage_duration, @@ -357,6 +500,7 @@ def read_mesa( recording_start_time=parsed_xml.recording_start_time, heartbeat_times=heartbeat_times, subject_data=subject_data[record_id], + activity_counts=activity_counts, ) diff --git a/tests/test_sleep_readers.py b/tests/test_sleep_readers.py index d717bfb6..8901c82e 100644 --- a/tests/test_sleep_readers.py +++ b/tests/test_sleep_readers.py @@ -17,6 +17,34 @@ from sleepecg.io.sleep_readers import Gender +def _dummy_nsrr_overlap(filename: str, mesa_ids: list[int]): + with open(filename, "w") as csv: + csv.write("mesaid,line,linetime,starttime_psg\n") + for i in range(len(mesa_ids)): + csv.write(f"{mesa_ids[i][-1]},1,20:30:00,20:29:59\n") + + +def _dummy_nsrr_actigraphy(filename: str, mesa_id: str): + """Create dummy actigraphy file with four usable activity counts.""" + base_time = datetime.datetime(2024, 1, 1, 20, 30, 0) + + linetimes = [ + (base_time + datetime.timedelta(seconds=30 * i)).strftime("%H:%M:%S") + for i in range(10) + ] + + with open(filename, "w") as csv: + csv.write("mesaid,line,linetime,activity\n") + for i in range(10): + csv.write(f"{mesa_id[-1]},{1 + i},{linetimes[i]},10\n") + + +def _dummy_nsrr_actigraphy_cached(filename: str): + """Create dummy npy file that resembles cached activity counts.""" + activity_counts = np.array([10, 10, 10, 10, 10, 10]) + np.save(filename, activity_counts) + + def _dummy_nsrr_edf(filename: str, hours: float, ecg_channel: str): ecg_5_min, fs = get_toy_ecg() seconds = int(hours * 60 * 60) @@ -26,6 +54,7 @@ def _dummy_nsrr_edf(filename: str, hours: float, ecg_channel: str): def _dummy_nsrr_xml(filename: str, hours: float, random_state: int): EPOCH_LENGTH = 30 + RECORDING_DURATION = 154.0 STAGES = [ "Wake|0", "Stage 1 sleep|1", @@ -47,6 +76,7 @@ def _dummy_nsrr_xml(filename: str, hours: float, random_state: int): "\n" "\n" "Recording Start Time\n" + f"{RECORDING_DURATION}\n" "01.01.85 20.29.59\n" "\n", ) @@ -72,24 +102,46 @@ def _dummy_nsrr_xml(filename: str, hours: float, random_state: int): ) -def _create_dummy_mesa(data_dir: str, durations: list[float], random_state: int = 42): +def _create_dummy_mesa( + data_dir: str, durations: list[float], random_state: int = 42, actigraphy: bool = False +): DB_SLUG = "mesa" ANNOTATION_DIRNAME = "polysomnography/annotations-events-nsrr" EDF_DIRNAME = "polysomnography/edfs" CSV_DIRNAME = "datasets" + OVERLAP_DIRNAME = "overlap" + ACTIVITY_DIRNAME = "actigraphy" + ACTIVITY_COUNTS_DIRNAME = "preprocessed/activity_counts" db_dir = Path(data_dir).expanduser() / DB_SLUG annotations_dir = db_dir / ANNOTATION_DIRNAME edf_dir = db_dir / EDF_DIRNAME csv_dir = db_dir / CSV_DIRNAME + overlap_dir = db_dir / OVERLAP_DIRNAME + activity_dir = db_dir / ACTIVITY_DIRNAME + activity_counts_dir = db_dir / ACTIVITY_COUNTS_DIRNAME for directory in (annotations_dir, edf_dir, csv_dir): directory.mkdir(parents=True, exist_ok=True) + if actigraphy: + for directory in (overlap_dir, activity_dir, activity_counts_dir): + directory.mkdir(parents=True, exist_ok=True) + record_ids = [] + for i, hours in enumerate(durations): record_id = f"mesa-sleep-{i:04}" _dummy_nsrr_edf(f"{edf_dir}/{record_id}.edf", hours, ecg_channel="EKG") _dummy_nsrr_xml(f"{annotations_dir}/{record_id}-nsrr.xml", hours, random_state) + if actigraphy: + _dummy_nsrr_actigraphy(f"{activity_dir}/{record_id}.csv", mesa_id=record_id) + _dummy_nsrr_actigraphy_cached( + f"{activity_counts_dir}/{record_id}-activity-counts.npy" + ) + record_ids.append(record_id) + + if actigraphy: + _dummy_nsrr_overlap(f"{overlap_dir}/mesa-actigraphy-psg-overlap.csv", record_ids) with open(csv_dir / "mesa-sleep-dataset-0.0.0.csv", "w") as csv: csv.write("mesaid,examnumber,race1c,gender1,cucmcn1c,sleepage5c\n") @@ -144,6 +196,55 @@ def test_read_mesa(tmp_path): assert set(rec.sleep_stages) - valid_stages == set() +def test_read_mesa_actigraphy(tmp_path): + """Basic sanity checks for records read via read_mesa including actigraphy.""" + durations = [0.1, 0.2] # hours + valid_stages = {int(s) for s in SleepStage} + + _create_dummy_mesa(data_dir=tmp_path, durations=durations, actigraphy=True) + records = list( + read_mesa( + data_dir=tmp_path, + heartbeats_source="ecg", + offline=True, + activity_source="actigraphy", + ) + ) + + assert len(records) == 2 + + for rec in records: + assert rec.sleep_stage_duration == 30 + assert set(rec.sleep_stages) - valid_stages == set() + assert len(rec.activity_counts) == 4 + assert Path( + f"{tmp_path}/mesa/preprocessed/activity_counts/{rec.id}-activity-counts.npy" + ).exists() + + +def test_read_mesa_actigraphy_cached(tmp_path): + """Basic sanity checks for records read via read_mesa including cached actigraphy.""" + durations = [0.1, 0.2] # hours + valid_stages = {int(s) for s in SleepStage} + + _create_dummy_mesa(data_dir=tmp_path, durations=durations, actigraphy=True) + records = list( + read_mesa( + data_dir=tmp_path, + heartbeats_source="ecg", + offline=True, + activity_source="cached", + ) + ) + + assert len(records) == 2 + + for rec in records: + assert rec.sleep_stage_duration == 30 + assert set(rec.sleep_stages) - valid_stages == set() + assert len(rec.activity_counts) == 6 + + def test_read_shhs(tmp_path): """Basic sanity checks for records read via read_shhs.""" durations = [0.1, 0.2] # hours