2-data-prep-and-analysis(Python)

Loading...

You may find this series of notebooks at https://github.com/databricks-industry-solutions/psm. For more information about this solution accelerator, visit https://www.databricks.com/blog/2020/10/20/detecting-at-risk-patients-with-real-world-data.html.

Data preparation and QC

In this notebook we start by creating cohorts based on our study design using synthea resources that are already loaded into delta (using 1-data-ingest notebook).

0. Initial Setup

First we run ./0-config notebook to configure our project's environment by setting up the base path, deltalake path, mlflow experiment etc. We then run ./cohort_builder notebook to import DeltaEHR class that is designed to make it easy to create cohorts based on synthea resources.

%run ./config/00-config
%run ./cohort-builder
from pprint import pprint
with open(f'/tmp/{project_name}_configs.json','r') as f:
    settings = json.load(f)
    delta_path = settings['delta_path']
pprint(settings)
{'artifact_location': 'dbfs:/databricks/mlflow-tracking/4233158071187382', 'base_path': '/home/jingting.lu@databricks.com/health-lakehouse', 'data_path': 's3://hls-eng-data-public/data/rwe/dbx-covid-sim/', 'delta_path': '/home/jingting.lu@databricks.com/health-lakehouse/psm/delta', 'experiment_id': '4233158071187382', 'experiment_name': '/Users/jingting.lu@databricks.com/psm', 'tags': {'mlflow.experiment.sourceName': '/Users/jingting.lu@databricks.com/psm', 'mlflow.experimentType': 'MLFLOW_EXPERIMENT', 'mlflow.ownerEmail': 'jingting.lu@databricks.com', 'mlflow.ownerId': '5215232244814299'}}

Now we specify our experiment's main parameters, namely: target medication (intervention under study), target event (event defining cohort entry date), and the target outcome (outcome under study)

target_params = {
  # set the target drug med code
  'target_med_code':20134224, # databrixovir  
  # set the target drug name
  'target_med_name':'databrixovir',
  # set the target event code
  'target_event_code':840539006,
  # set the target event name
  'target_event_name':'covid',
  # set the target outcome
  'target_outcome' : 'admission',
  'target_outcome_code': 1505002
}

We also would like to include information regarding past histroy of comorbidities (for example obesity etc) that can be later used in our propencity score matching

comorbidities = {
'obesity':[162864005,408512008],
'hypertension':[59621000],
'heart_disease':[53741008], 
'diabetes':[44054006],
'smoking':[449868002],
}
import mlflow
mlflow.set_experiment(settings['experiment_name'])
with mlflow.start_run(run_name='cohort-creation'):
  mlflow.log_params({**comorbidities, **target_params})

1. cohort creation

To make cohort creation and ETL easier, we created a class DeltaEHR that makes data pre-processing easier. Using the available methods we create target and outcome cohorts, based on our inclusion/exclusion criteria. First we start by adding our target cohort (patients diagnosed with covid) and outcome cohort (patients who have been admitted to the hospital). Eeach cohort is then automatically added to the collection of cohorts. Each chort contains three columns, patinet id (PATIENT), cohort start index date (START) and cohort exit date (STOP).

delta_ehr=DeltaEHR(delta_path)
delta_ehr.add_simple_cohort(cohort_name='covid',resource='conditions',inclusion_criteria=f"CODE=={target_params['target_event_code']}")
delta_ehr.add_simple_cohort(cohort_name='admission',resource='encounters',inclusion_criteria=f"REASONCODE == {target_params['target_event_code']} AND CODE == {target_params['target_outcome_code']}")
delta_ehr.add_simple_cohort(cohort_name='deceased',resource='patients',inclusion_criteria="DEATHDATE is not null",start_col='DEATHDATE',stop_col='DEATHDATE')
target_cohort=delta_ehr.cohorts['covid']

next we specify which demographic information to inlclude in the dataset

delta_ehr.set_patient_list(demog_list=['BIRTHDATE','MARITAL','RACE','ETHNICITY','GENDER'])

Now we add cohorts based on prior events (comorbidities, drug exposure etc). For each comorbid condition of interest, we choose a window of time to go back and look for any record of diagnosis of, or exposure to, of a condition of interest. This is done simply by using the add_cohort method defined in DeltaEHR class. This method also allows you to specify a washout window (gate_window) as a buffer in cases where we want to ensure effects of treatments do not interfere. For example if this is specified to 10 days, then if there is an instance of a comorbidity diagnosis within 10 dyas of the target ourcome, we do not inlcude that event. Note that if you speciy a negative value for the washout window you can include evnets occuring after the target event (see below)

for event,codes in list(comorbidities.items()):
  delta_ehr.add_cohort('conditions', event, codes,3*365,10, 'covid')

Now we add the cohort of patinets that have received the target treatment with 10 days of being diagnosed with covid. This is done the same way as adding cohorts based on historic events, with the difference that in this case we set hist_winodw=0 and gate_window=-10

delta_ehr.add_cohort('medications', target_params['target_med_name'], target_params['target_med_code'], 0,-10, 'covid')

Optionally you can also add cohorts correspodning to other treatments, for example:

meds_test=(
  delta_ehr.tables['medications'].filter("to_date(START) > to_date('2020-01 01')")
  .join(
    delta_ehr.cohorts['covid'].select('PATIENT'),on='PATIENT')
  .join(delta_ehr.cohorts['admission'].select('PATIENT'),on='PATIENT')
  .groupBy('CODE','DESCRIPTION')
  .count()
  .orderBy(desc('count'))
  .limit(20)
  .collect()
)

medications={f"m_{m['CODE']}":m['CODE'] for m in meds_test}
for med,codes in list(medications.items()):
  delta_ehr.add_cohort('medications', med, codes,0,-10, 'covid')

we can also add cohort of patients experiencing other symptoms such as blood clots within 20 days of diagnosis with covid

blood_clot={"blood_clot":234466008}
delta_ehr.add_cohort('conditions','blood_clot',234466008, 0, -20,'covid')

2. create dataset

Now we creat the final dataset for our downstream analysis. One of the methods in DeltaEHR is get_cohort_tags(). This method combines all cohort information in form of columns of indicator functions (cohort membership indicator) and the cohort index date correspoding to each patient id.

data_df=delta_ehr.combine_all_cohorts()
data_df.createOrReplaceTempView("delta_ehr_cohorts")
display(data_df)
 
PATIENT
BIRTHDATE
MARITAL
RACE
ETHNICITY
GENDER
max_START
min_STOP
covid_START
is_covid
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
00405fb9-b44f-4ef5-a92d-a0c3d7ca2607
1947-11-27
M
white
nonhispanic
M
2020-03-08
1966-01-20
2020-03-08
1
0045e6ab-931c-44f7-8f8e-28e2b253e519
1936-04-13
M
white
nonhispanic
M
2020-03-04
1952-05-27
2020-03-03
1
0053c053-09b3-4089-b840-75f3a2e14ac6
1969-06-22
M
white
nonhispanic
F
2020-03-29
1987-08-16
2020-03-02
1
00d38be8-920a-4e66-92c8-65fe41c5fc4e
1969-12-03
S
white
nonhispanic
F
2003-12-10
1988-01-27
null
0
00e3e25d-7cdc-484e-ac65-aee7396dae0b
1985-02-18
M
white
nonhispanic
F
2020-03-06
2003-04-14
2020-03-06
1
00ebd85e-d2a8-41fd-9e7c-c85cd9d288b3
2005-05-18
null
white
nonhispanic
F
2020-05-07
2006-09-11
2020-03-04
1
0120542e-62f7-4b60-8663-f07524b74b7e
2005-05-15
null
white
nonhispanic
M
2020-05-24
2006-02-16
2020-03-09
1
01358bd4-cdcb-4dee-86ac-5e8871fb2978
1995-03-22
null
white
nonhispanic
F
2020-01-20
1998-04-11
null
0
0168f34a-571d-4251-8814-8fb0781e03ba
1979-05-06
M
black
nonhispanic
M
2020-03-11
2012-12-30
2020-03-11
1
01c63fc5-0293-462b-925a-d497bd6b270a
1916-05-11
M
white
nonhispanic
M
1995-07-20
1934-07-05
null
0
01daf49b-0d81-4193-8482-51aa2a5fb60c
1993-08-18
null
white
nonhispanic
F
2020-05-06
2018-06-26
2020-02-22
1
022e8034-9294-4ab0-ada2-6ace6697913f
1990-09-24
M
white
hispanic
M
2020-03-06
1992-09-30
2020-03-06
1
02cd1a6d-3613-457b-b9fe-ad18a57affc2
1939-03-21
M
white
nonhispanic
M
1992-09-30
1972-05-30
null
0
037280d2-4487-4cf4-902c-a6309b197f56
1994-01-12
null
asian
nonhispanic
F
2020-04-23
1994-01-13
2020-03-08
1
03737c83-c8f3-4adf-b47e-ed9a5206e334
1997-10-04
null
white
hispanic
M
2020-03-09
2015-11-23
2020-03-09
1
03859286-ef09-4ad3-bf24-76567efbd2e3
2005-10-24
null
white
nonhispanic
M
2020-03-04
2006-04-23
2020-03-03
1
03d380b2-cd17-411b-88d2-50ccca795c87
1968-10-13
S
white
nonhispanic
M
2020-03-02
2018-09-23
2020-03-02
1
1,000 rows|Truncated data

3. Exploratory Analysis

Now let's take a look at the dataset and look at trends, such as number of individuals diagnosed with the target condition (covid) over time, and demographic trends and other statistics of interest.

covid_counts_by_age_df= sql("""
  SELECT covid_START, 20*cast(age_at_covid/20 as integer) as age_band, count(*) as count
  FROM delta_ehr_cohorts
  WHERE is_covid == 1
  group by 1, 2
  order by covid_START
""")
display(covid_counts_by_age_df)
 
covid_START
age_band
count
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2020-01-14
20
1
2020-01-15
20
1
2020-01-18
0
1
2020-01-18
20
1
2020-01-19
60
1
2020-01-19
40
1
2020-01-20
40
1
2020-01-20
60
1
2020-01-21
80
1
2020-01-22
0
2
2020-01-22
40
2
2020-01-22
20
3
2020-01-22
60
1
2020-01-23
40
2
2020-01-24
80
1
2020-01-24
60
4
2020-01-24
0
2
385 rows
import plotly.express as px
df = covid_counts_by_age_df.toPandas()
fig = px.bar(df, x="covid_START", color='age_band',barmode='stack',y="count")
fig.show()
%sql
SELECT race, avg(is_hypertension) as hypertension_frequency
FROM delta_ehr_cohorts
GROUP BY race
order by 2
 
race
hypertension_frequency
1
2
3
4
5
native
0.00482315112540193
other
0.007518796992481203
white
0.009165233598170796
black
0.010943649803206298
asian
0.011006835824354072
5 rows

Now let's take a look at some of the trends regarding reported blood clot occurances, such as dirtribution of blood clots among covid patients vs others, distribution of the timeframe within which a blood clot is reported, and look at demographic patterns that may be correlated with these timeframes.

%sql
SELECT is_covid,
       sum(is_blood_clot) as count
FROM delta_ehr_cohorts
GROUP BY is_covid
 
is_covid
count
1
2
0
0
1
1389
2 rows
%sql

SELECT gender,
       age_at_blood_clot,
       count(*) as count
FROM delta_ehr_cohorts
WHERE is_blood_clot == 1
GROUP BY age_at_blood_clot, gender
ORDER BY age_at_blood_clot
 
gender
age_at_blood_clot
count
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
F
5
1
F
8
1
F
18
1
F
19
2
M
19
2
M
20
1
M
21
4
F
21
3
F
22
3
M
22
1
M
23
3
F
23
1
M
24
6
F
24
1
M
25
5
M
26
8
M
27
8
179 rows

Now let's compare admission rates among patinets who have received the treatment and those who have not

data_df.filter(f"is_{target_params['target_event_name']}==1")\
       .groupBy(f"is_{target_params['target_med_name']}")\
       .agg(
          avg(f"is_{target_params['target_outcome']}").alias('admission_probability'))\
       .display()
 
is_databrixovir
admission_probability
1
2
0
0.18618367583884826
1
0.1455843551140773
2 rows

as we see, overall the admission rates are lower among those who have received the traget treatmnet, however this can be confunded by many factors, for example it can be the case that younger patients are more likely receive the treatment and also being young less likely being admitted to the hospital. In this case we cannot attribute lower admission rates to the treatment. In the next notebook, we use propencity score matching to correct for such confunding factors. But first, let's write the resulting dataset to delta silver layer.

4. Write final dataset to Delta

Now we write the resulting dataset back in the delta lake for our next analysis that is specifically looking into the effect of databrixovir on hospital admissions

data_df.write.mode('overwrite').option("overwriteSchema", "true").save(f"{delta_path}/silver/patient_data")