library(data.table) chunk_size <- 10000 csv_directory <- "/home/acumenus/GitHub/synthea/output/csv" # List all CSV files in the directory csv_files <- list.files(csv_directory, pattern = "*.csv", full.names = TRUE) # Function to process each chunk process_chunk <- function(chunk) { # Your data processing code here cd <- DatabaseConnector::createConnectionDetails( dbms = "postgresql", server = "localhost/omop", user = "postgres", password = "acumenus", port = 5440, pathToDriver = "/home/acumenus/GitHub/drivers" ) cdmSchema <- "synthea" cdmVersion <- "5.4" syntheaVersion <- "3.0.0" syntheaSchema <- "native" syntheaFileLoc <- "/home/acumenus/GitHub/synthea/output/csv" vocabFileLoc <- "/home/acumenus/GitHub/synthea/vocabulary_v5_latest" ETLSyntheaBuilder::CreateCDMTables(connectionDetails = cd, cdmSchema = cdmSchema, cdmVersion = cdmVersion) ETLSyntheaBuilder::CreateSyntheaTables(connectionDetails = cd, syntheaSchema = syntheaSchema, syntheaVersion = syntheaVersion) ETLSyntheaBuilder::LoadSyntheaTables(connectionDetails = cd, syntheaSchema = syntheaSchema, syntheaFileLoc = syntheaFileLoc) ETLSyntheaBuilder::LoadVocabFromCsv(connectionDetails = cd, cdmSchema = cdmSchema, vocabFileLoc = vocabFileLoc) ETLSyntheaBuilder::LoadEventTables(connectionDetails = cd, cdmSchema = cdmSchema, syntheaSchema = syntheaSchema, cdmVersion = cdmVersion, syntheaVersion = syntheaVersion) } # Loop through all CSV files and read them in chunks for (file_path in csv_files) { # Get the column names fread(file_path, nrows = 0) -> columns_dt columns <- colnames(columns_dt) # Count the number of lines in the file file_lines <- length(readLines(file_path)) - 1 # Define the starting positions of each chunk chunk_starts <- seq(from = 1, to = file_lines, by = chunk_size) # Read and process each chunk for (i in 1:length(chunk_starts)) { chunk_start <- chunk_starts[i] chunk_end <- min(chunk_start + chunk_size, file_lines + 1) chunk <- fread(file_path, skip = chunk_start, nrows = chunk_end - chunk_start, col.names = columns) process_chunk(chunk) } }