# parkinsons.R ---------------------------------------------------------------------------------------------------- # Implements the tiered Parkinson's Disease algorithm. # By Fabrício Kury # 2023/2/8 20:19 # library(wrap) library(credx) suppressPackageStartupMessages(library(dplyr)) # Control --------------------------------------------------------------------------------------------------------- do_inject_sql <- TRUE phea_cdm <- 'cdm_minnesota3' # Download cohort from atlas-demo --------------------------------------------------------------------------------- download_atlas_demo_cohort <- function(cohortId, out_dir) { # cohortId <- 1781786 # 1781767 # 1769430 #c(1778211,1778212,1778213) # Get the SQL/JSON for the cohorts cohortDefinitionSet <- ROhdsiWebApi::exportCohortDefinitionSet( baseUrl = "http://api.ohdsi.org:8080/WebAPI", cohortIds = cohortId) CohortGenerator::saveCohortDefinitionSet( cohortDefinitionSet = cohortDefinitionSet, settingsFileName = paste0(out_dir, "/inst/Cohorts.csv"), jsonFolder = paste0(out_dir, "/inst/cohorts"), sqlFolder = paste0(out_dir, "/inst/sql/sql_server"), cohortFileNameFormat = "%s", cohortFileNameValue = c("cohortId")) } # Generate phenotype ---------------------------------------------------------------------------------------------- generate_phea_phenotype <- function() { # Prepare --------------------------------------------------------------------------------------------------------- library(phea) con <- dbConnectFort() setup_phea(con, phea_cdm, compatibility_mode = TRUE) # Codesets -------------------------------------------------------------------------------------------------------- .GlobalEnv$codesets <- tibble(codeset_id = integer(), concept_id = integer()) |> dbplyr::copy_inline(con = phea:::.pheaglobalenv$con) # Components ------------------------------------------------------------------------------------------------------ # window_date_expr <- "date('3 years')" # PD pd <- sqlt(condition_occurrence) |> inner_join(by = c('condition_concept_id' = 'concept_id'), codesets |> filter(codeset_id == 0) |> select(concept_id)) |> make_record_source( .pid = person_id, .ts = condition_start_datetime) |> make_component( # window = window_date_expr, fn = list( condition_occurrence_id = 'count')) # neurodegenerative PD (8) and secondary parkinsonism (10) non_pd <- sqlt(condition_occurrence) |> inner_join(by = c('condition_concept_id' = 'concept_id'), codesets |> filter(codeset_id %in% c(8, 10)) |> select(concept_id)) |> make_record_source( .pid = person_id, .ts = condition_start_datetime) |> make_component( # window = window_date_expr, fn = list( condition_occurrence_id = 'count')) visits <- sqlt(visit_occurrence) |> make_record_source( .pid = person_id, .ts = visit_start_datetime) |> make_component( # window = window_date_expr ) # Phenotype ------------------------------------------------------------------------------------------------------- phen <- calculate_formula( components = list( a = pd, b = non_pd, v = visits ), fml = list( layer_1 = list( pd_count = 'a_condition_occurrence_id', non_pd_count = 'b_condition_occurrence_id'), layer_2 = list( is_pd = paste0("case when pd_count > non_pd_count then 1 else 0 end")) ), dates_from = 'v', export = 'v_visit_occurrence_id', calculate_window = FALSE ) |> filter(is_pd == 1) |> select(person_id = pid, visit_occurrence_id = v_visit_occurrence_id) if(F) { phen_2 <- phen |> mutate(kco = sql('is_pd = 1')) |> keep_change_of( of = 'kco', order = 'ts', partition = 'pid') } if(F) { phen |> mutate(kcf_var = sql(era_sql)) |> keep_change_of('kcf_var', order = 'ts', partition = 'pid') |> filter(pid == 5153) select(-kcf_var) |> group_by(pid) |> summarise( lines = n()) |> arrange(desc(lines)) } if(F) { era_sql <- 'is_pd = 1' make_era_of <- function(board, condition, order, partition) { res <- board |> # dplyr::compute(board) |> mutate(phea_meo = sql(condition)) |> keep_change_of( of = 'phea_meo', order = order, partition = partition) |> select(!!partition, !!order, phea_meo) res_cpt <- res # dplyr::compute(res) entry <- res_cpt |> filter(phea_meo) |> mutate(start_date = !!sym(order)) |> select(-phea_meo) exit <- res_cpt |> filter(!phea_meo) |> mutate(end_date = !!sym(order)) |> select(-phea_meo) sql_txt <- paste0('min(', DBI::dbQuoteIdentifier(.pheaglobalenv$con, 'end_date'), ')') res2 <- union_all(entry, exit) |> dbplyr::window_order(!!sym(partition), !!sym(order)) |> mutate( end_date = dbplyr::win_over( con = .pheaglobalenv$con, expr = dplyr::sql(sql_txt), partition = 'pid', order = 'ts', frame = c(0, Inf))) res2_cpt <- res2 |> select(-!!sym(order)) # |> # dplyr::compute() res3 <- res2_cpt |> filter(!is.na(start_date)) |> select( person_id = pid, start_date, end_date) res4 <- list( a = res3, b = sqlt(observation_period)) |> sqla(" select a.*, b.observation_period_end_date from a inner join b on a.person_id = b.person_id and a.start_date >= b.observation_period_start_date and a.start_date <= b.observation_period_end_date") |> mutate( # start_date = coalesce(start_date, OBSERVATION_PERIOD_START_DATE), end_date = coalesce(end_date, observation_period_end_date)) |> select(person_id, start_date, end_date) res4 } phen3 <- phen |> make_era_of( condition = era_sql, order = 'ts', partition = 'pid') browser() } phen } # Inject SQL ------------------------------------------------------------------------------------------------------ if(do_inject_sql) { cohort_id <- 1781786 cohort_dir <- paste0('./cohort', cohort_id) download_atlas_demo_cohort(cohort_id, cohort_dir) circe_sql_file <- paste0(cohort_dir, "/inst/sql/sql_server/", cohort_id, ".sql") new_circe_sql_file <- gsub('.sql$', '_phea.sql', circe_sql_file) circe_sql <- readLines(circe_sql_file) |> paste0(collapse = '\n') phen <- generate_phea_phenotype() # browser() placeholder_visit_codeset_id <- 11 # "\\(.+select vo\\.*.+FROM @cdm_database_schema\\.VISIT_OCCURRENCE vo.+", # "JOIN #Codesets cs on \\(vo\\.visit_concept_id = cs\\.concept_id and cs\\.codeset_id = 11\\).+\\) C" injected_sql <- gsub( pattern = paste0("JOIN #Codesets cs on \\(vo\\.visit_concept_id = cs\\.concept_id and cs\\.codeset_id = ", placeholder_visit_codeset_id, "\\)"), replacement = paste0("JOIN (", dbplyr::sql_render(phen), ") phea_phen on ", "(vo.person_id = phea_phen.person_id and vo.visit_occurrence_id = phea_phen.visit_occurrence_id)"), x = circe_sql) codesets_reference <- dbplyr::sql_render(codesets) |> as.character() |> gsub( pattern = '\\s+', replacement = ' ') |> gsub( pattern = ' ', replacement = '\\s+', fixed = TRUE) |> gsub( pattern = '(', replacement = '\\(', fixed = TRUE) |> gsub( pattern = ')', replacement = '\\)', fixed = TRUE) codesets_replacement <- "select * from #Codesets" injected_sql <- gsub( pattern = codesets_reference, replacement = codesets_replacement, x = injected_sql) schema_reference <- '("|`|\')?cdm_minnesota3("|`|\')?' schema_replacement <- "@cdm_database_schema" injected_sql <- gsub( pattern = schema_reference, replacement = schema_replacement, x = injected_sql) message('Overwriting ', circe_sql_file) writeLines(injected_sql, circe_sql_file) message('Phenotype SQL injection completed.') }