Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cohorte med outcome #904

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import polars as pl

from psycop.common.cohort_definition import (
CohortDefiner,
FilteredPredictionTimeBundle,
OutcomeTimestampFrame,
filter_prediction_times,
)
#get timestamps to somatic admission. These timestampts can be used for filtering and for outcome
from psycop.projects.AcuteSomaticAdmission.CohortDefinition.get_somatic_emergency_visits import (
get_contacts_to_somatic_emergency,
)
#get timestamps for outpatient visits
from psycop.projects.AcuteSomaticAdmission.CohortDefinition.get_psychiatric_outpatient_visits import (
get_outpatient_visits_to_psychiatry,
)
from psycop.projects.AcuteSomaticAdmission.CohortDefinition.single_filters import (
SomaticAdmissionMinAgeFilter,
SomaticAdmissionMinDateFilter,
SomaticAdmissionWashoutMove,
SomaticAdmissionWashoutPriorSomaticAdmission,
)

class SomaticAdmissionCohortDefiner(CohortDefiner):
@staticmethod
def get_filtered_prediction_times_bundle(
washout_on_prior_somatic_contacts: bool = True,
) -> FilteredPredictionTimeBundle:
unfiltered_prediction_times = pl.from_pandas(get_outpatient_visits_to_psychiatry()).lazy()

if washout_on_prior_somatic_contacts:
return filter_prediction_times(
prediction_times=unfiltered_prediction_times,
filtering_steps=(
SomaticAdmissionMinDateFilter(),
SomaticAdmissionMinAgeFilter(),
SomaticAdmissionWashoutMove(),
SomaticAdmissionWashoutPriorSomaticAdmission(),
),
entity_id_col_name="dw_ek_borger",
)

return filter_prediction_times(
prediction_times=unfiltered_prediction_times,
filtering_steps=(
SomaticAdmissionMinDateFilter(),
SomaticAdmissionMinAgeFilter(),
),
entity_id_col_name="dw_ek_borger",
)

@staticmethod
def get_outcome_timestamps() -> OutcomeTimestampFrame:
return OutcomeTimestampFrame(frame=pl.from_pandas(get_contacts_to_somatic_emergency()))


if __name__ == "__main__":
bundle = SomaticAdmissionCohortDefiner.get_filtered_prediction_times_bundle()

bundle_no_washout = (
SomaticAdmissionCohortDefiner.get_filtered_prediction_times_bundle(
washout_on_prior_somatic_contacts=False
)
)

df = bundle.prediction_times.frame.to_pandas()

df_no_washout = bundle_no_washout.prediction_times.frame.to_pandas()

outcome_timestamps = SomaticAdmissionCohortDefiner.get_outcome_timestamps()
18 changes: 18 additions & 0 deletions psycop/projects/AcuteSomaticAdmission/CohortDefinition/add_age.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import polars as pl

from psycop.common.feature_generation.loaders.raw.load_demographic import birthdays
from psycop.projects.forced_admission_outpatient.cohort.prediction_timestamp_filters.eligible_config import (
AGE_COL_NAME,
)


def add_age(df: pl.DataFrame) -> pl.DataFrame:
birthday_df = pl.from_pandas(birthdays())

df = df.join(birthday_df, on="dw_ek_borger", how="inner")
df = df.with_columns(
((pl.col("timestamp") - pl.col("date_of_birth")).dt.days()).alias(AGE_COL_NAME)
)
df = df.with_columns((pl.col(AGE_COL_NAME) / 365.25).alias(AGE_COL_NAME))

return df
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from datetime import datetime

AGE_COL_NAME = "age"
MIN_AGE = 18
MIN_DATE = datetime(year=2014, month=1, day=1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vi plejer at køre fra 2013/01/01

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
Script for obtaining and writing all ambulant psychiatric visits. Handles LPR2 to LPR3 transition and duplicates
"""
from datetime import timedelta

import pandas as pd

from psycop.common.global_utils.sql.loader import sql_load
from psycop.common.global_utils.sql.writer import write_df_to_sql

def get_outpatient_visits_to_psychiatry(write: bool = False) -> pd.DataFrame:
# Load all physical visits data
view = "[FOR_besoeg_fysiske_fremmoeder_inkl_2021_feb2022]"
cols_to_keep = "datotid_start, datotid_slut, dw_ek_borger, psykambbesoeg AS pt_type"

sql = "SELECT " + cols_to_keep + " FROM [fct]." + view
sql += "WHERE datotid_start > '2012-01-01' AND psykambbesoeg = 1"

df = pd.DataFrame(sql_load(sql, chunksize=None)) # type: ignore

df[["datotid_start", "datotid_slut"]] = df[["datotid_start", "datotid_slut"]].apply(
pd.to_datetime
)

# Subtract 1 day from datotid_start in ambulant dates because we want to make predictions one day prior to visit
# Even if it is the first psychiatric contact we would still like to make a prediction before
# the visit because the patient might have information from somatic visits
df["datotid_predict"] = df["datotid_start"] - timedelta(days=1) # type: ignore

df = df.drop_duplicates(subset=["dw_ek_borger", "datotid_predict"])

if write:
ROWS_PER_CHUNK = 5_000

write_df_to_sql(
df=df[["dw_ek_borger", "datotid_predict"]], # type: ignore
table_name="all_psychiatric_outpatient_visits_processed_2012_2021_ANDDAN_SOMATIC_ADMISSION",
if_exists="replace",
rows_per_chunk=ROWS_PER_CHUNK,
)

#Så sætter jeg det korrekte navn - de andre kalder nedenstående funktion der gør det samme. Måske for at spare tid når datasættet loades
df = df.rename(columns={"datotid_predict": "timestamp"})

return df[["dw_ek_borger", "timestamp"]] # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Du kan vist udskifte hele denne funktion med et kald til:

from psycop.common.feature_generation.loaders.raw.load_visits import ambulatory_visits

prediction_times = pl.from_pandas(
            ambulatory_visits(
                timestamps_only=True,
                timestamp_for_output="start",
                n_rows=None,
                return_value_as_visit_length_days=False,
                shak_code=6600,
                shak_sql_operator="=",
            )
        ).with_columns(pl.col("timestamp") - pl.duration(days=1))

ambulatory_visits loaderen henter alle besøg til ambulatoriet, med mulighed for at begrænse til en bestemt shakkode (6600 for psykiatrien)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hej Lasse
Når jeg forsøger at køre koden siger den at 'p1' ikke er defineret. Jeg kan ikke helt gennemskue hvor den skal defineres. kan du hjælpe mig?

Copy link
Contributor

@HLasse HLasse May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ja, der mangler at importeres polars. Så tilføj

import polars as pl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Det fungere strålende - tak :-)



def outpatient_visits_timestamps() -> pd.DataFrame:
# Load somatic_admissions data
view = "[all_psychiatric_outpatient_visits_processed_2012_2021_ANDDAN_SOMATIC_ADMISSION]"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kan se at du har uploaded den df som funktionen ovenfor laver til SQL databasen i dette navn. Jeg vil klart anbefale bare at kalde get_outpatient_visits_to_psychiatry (eller den funktion jeg har skrevet i kommentaren ovenfor) i stedet - det gør det meget nemmere at læse og vedligeholde.

Denne funktion kan virkeligheden fjernes, da den egentlig bare er et kald til get_outpatient_visits_to_psychiatry


sql = "SELECT * FROM [fct]." + view

outpatient_visits = pd.DataFrame(sql_load(sql, chunksize=None)) # type: ignore

outpatient_visits = outpatient_visits.rename(columns={"datotid_predict": "timestamp"})

return outpatient_visits


if __name__ == "__main__":
df_test = get_outpatient_visits_to_psychiatry()
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
Script for obtaining and writing all somatic emergency contacts
for all admissions from 2012-2022. Handles LPR2 to LPR3 transition, duplicates
and short term readmissions. Bemærk that Traumecenter er ambulant så kontakter udelukkende hertil tæller ikke
hvilket er helt ok da patienter der udskrives direkte fra Traumecenteret ikke har været syge (for så var de jo blevet indlagt).
Jeg skal dog være opmærksom på patienter som indlægges pga selvmordsforsøg. Dem vil der være nogen af.
"""
import pandas as pd

from psycop.common.global_utils.sql.loader import sql_load
from psycop.common.global_utils.sql.writer import write_df_to_sql
from psycop.projects.forced_admission_inpatient.cohort.extract_admissions_and_visits.utils.utils import (
concat_readmissions_for_all_patients,
lpr2_lpr3_overlap,
)


def get_contacts_to_somatic_emergency(write: bool = False) -> pd.DataFrame:
# Load contact data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vi har også allerede en loader til dette: psycop.common.feature_generation.loaders.raw.load_visits.emergency_visits. Jeg kan se at den kalder et andet view ("[FOR_akutambulantekontakter_psyk_somatik_LPR2_inkl_2021_feb2022]"). Hvis der er uoverenstemmelse mellem de to, ville det være fedt at få dem opdateret så det rigtige view bliver kaldt af emergency_visits.

Igen kan der i kaldet til emergency_visits() specificeres en shakkode så det kun er data fra somatikken du får

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smart. Jeg har valgt at bruge en anden tabel da jeg egentlig ønsker akutte indlæggelser i somatikken. Har skrevet koden så det passer. Jeg kunne sikkert med fordel lave en loader. Men der vil jeg få brug for hjælp. Spørger næste gang jeg mødet en af jer på Børglumvej.

view = "[FOR_kohorte_indhold_pt_journal_psyk_somatik_inkl_2021_feb2022]"
cols_to_keep = "datotid_start, datotid_slut, dw_ek_borger, pt_type"

sql = "SELECT " + cols_to_keep + " FROM [fct]." + view

#sql = "SELECT * FROM [fct]." + view
sql += "WHERE datotid_start > '2012-01-01' AND pt_type = 'Indlagt'"
sql += " AND datotid_start IS NOT NULL AND datotid_slut IS NOT NULL;"

df = pd.DataFrame(sql_load(sql, chunksize=None)) # type: ignore

df["datotid_start"] = df["datotid_start"].apply(
pd.to_datetime
)

df["datotid_slut"] = df["datotid_slut"].apply(
pd.to_datetime
)


if write:
ROWS_PER_CHUNK = 5_000

write_df_to_sql(
df=df[["dw_ek_borger", "datotid_start"]],
table_name="all_psychiatric_outpatient_visits_processed_2012_2021_ANDDAN_SOMATIC_ADMISSION",
if_exists="replace",
rows_per_chunk=ROWS_PER_CHUNK,
)
Copy link
Contributor

@HLasse HLasse May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Igen vil jeg anbefale ikke at gøre dette og så i stedet bare kalde denne funktion når du skal bruge det. Der kan hurtigt ske fejl hvis man ændrer lidt i den her kode, men ikke får kørt funktionen igen så man kommer til at bruge en gammel tabel :)


#Så sætter jeg det korrekte navn - de andre kalder nedenstående funktion der gør det samme. Måske for at spare tid når datasættet loades
df = df.rename(columns={"datotid_start": "timestamp"})

return df[["dw_ek_borger", "timestamp"]] # type: ignore

def admissions_onset_timestamps() -> pd.DataFrame:
# Load somatic_admissions data
view = "[all_psychiatric_outpatient_visits_processed_2012_2021_ANDDAN_SOMATIC_ADMISSION]"
cols_to_keep = "dw_ek_borger, datotid_start"

sql = "SELECT " + cols_to_keep + " FROM [fct]." + view

admissions_onset_timestamps = pd.DataFrame(sql_load(sql, chunksize=None)) # type: ignore

admissions_onset_timestamps = admissions_onset_timestamps.rename(
columns={"datotid_start": "timestamp"}
)

return admissions_onset_timestamps
Copy link
Contributor

@HLasse HLasse May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Denne kan også slettes :)


if __name__ == "__main__":
get_contacts_to_somatic_emergency()
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import polars as pl

from psycop.common.cohort_definition import PredictionTimeFilter
from psycop.common.feature_generation.loaders.raw.load_moves import MoveIntoRMBaselineLoader
from psycop.common.model_training_v2.trainer.base_dataloader import BaselineDataLoader
from psycop.common.model_training_v2.trainer.preprocessing.steps.row_filter_other import (
QuarantineFilter,
)
from psycop.projects.AcuteSomaticAdmission.CohortDefinition.add_age import add_age
from psycop.projects.AcuteSomaticAdmission.CohortDefinition.get_somatic_emergency_visits import (
get_contacts_to_somatic_emergency,
)
from psycop.projects.AcuteSomaticAdmission.CohortDefinition.eligible_config import (
AGE_COL_NAME,
MIN_AGE,
MIN_DATE,
)


class SomaticAdmissionMinDateFilter(PredictionTimeFilter):
def apply(self, df: pl.LazyFrame) -> pl.LazyFrame:
after_df = df.filter(pl.col("timestamp") > MIN_DATE)
return after_df


class SomaticAdmissionMinAgeFilter(PredictionTimeFilter):
def apply(self, df: pl.LazyFrame) -> pl.LazyFrame:
df = add_age(df.collect()).lazy()
after_df = df.filter(pl.col(AGE_COL_NAME) >= MIN_AGE)
return after_df


class SomaticAdmissionWashoutMove(PredictionTimeFilter):
def apply(self, df: pl.LazyFrame) -> pl.LazyFrame:
not_within_a_year_from_move = QuarantineFilter(
entity_id_col_name="dw_ek_borger",
quarantine_timestamps_loader=MoveIntoRMBaselineLoader(),
quarantine_interval_days=365,
timestamp_col_name="timestamp",
).apply(df)

return not_within_a_year_from_move

#Jeg kan være i tvivl om jeg skal bruge nedenstående
class SomaticAdmissionTimestampsLoader(BaselineDataLoader):
def load(self) -> pl.LazyFrame:
return pl.from_pandas(get_contacts_to_somatic_emergency()).lazy()


class SomaticAdmissionWashoutPriorSomaticAdmission(PredictionTimeFilter):
def apply(self, df: pl.LazyFrame) -> pl.LazyFrame:
not_within_two_years_from_acute_somatic_contact = QuarantineFilter(
entity_id_col_name="dw_ek_borger",
quarantine_timestamps_loader=SomaticAdmissionTimestampsLoader(),
quarantine_interval_days=730,
timestamp_col_name="timestamp",
).apply(df)

return not_within_two_years_from_acute_somatic_contact
Loading