OHDSI Home | Forums | Wiki | Github

Error in Databricks using "copy into" SQL command to populate table from csv file (string is not converted to integer)

I have a need to insert data into a Databricks table using data from a csv file.

I’ve uploaded the file to Databricks, but when I try to use it to insert data into the table using copy into, I get an error that the command is not interpreting the column in the csv file as an integer, but as a string and it is not casting that string to an int. The file was uploaded using the REST API.

How do I get Databricks to write these data to the table?

This is the code I’m currently using.

COPY INTO concept
  FROM '/FileStore/tables/prod//ohdsi/demo_cdm/concept/concept.csv'
  FILEFORMAT = CSV
  FORMAT_OPTIONS ('mergeSchema' = 'true',
                  'inferSchema' ='true',
                  'delimiter' = ',',
                  'header' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

This also fails

COPY INTO concept
  FROM '/FileStore/tables/prod//ohdsi/demo_cdm/concept/concept.csv'
  FILEFORMAT = CSV
  FORMAT_OPTIONS (
    'delimiter' = ',',
    'header' = 'true')
;

This is the error message

Error in SQL statement: AnalysisException: Failed to merge fields 'concept_id' and 'concept_id'. Failed to merge incompatible data types IntegerType and StringType

One option is to use the DDL to pre-create empty tables, and then read the schema and have DataBricks apply that known schema during the data load. We use the approach below to load the vocabularies we download from Athena (after first running cpt.sh and re-gzipping them and uploading them to blob storage). You’d need to modify this slightly for your needs.

tables = [
'CONCEPT',
'CONCEPT_ANCESTOR',
'CONCEPT_CLASS',
'CONCEPT_RELATIONSHIP',
'CONCEPT_SYNONYM',
'DOMAIN',
'DRUG_STRENGTH',
'RELATIONSHIP',
'VOCABULARY',
]

target_db = 'voc_schema'
data_path = 'wasbs://<your-blob-container>@<your-storage-account>.blob.core.windows.net/'

# Requires that tables have been already created so can read their schemas
for t in tables:
    tgt_t = t.lower()
    spark.sql('TRUNCATE TABLE {db}.{table}'.format(db=target_db, table=tgt_t))
    # get schema of table
    df = spark.sql('SELECT * FROM {db}.{table}'.format(db=target_db, table=tgt_t))
    spark.read.options(delimiter="\t", header="True", dateFormat="yyyyMMdd")\
        .schema(df.schema)\
        .csv(data_path + '/' + t + '.csv.gz')\
        .write.format('delta')\
        .insertInto(target_db + '.' + tgt_t.lower(), overwrite=True)
    spark.sql('REFRESH TABLE {db}.{table}'.format(db=target_db, table=tgt_t))

Thanks @Thomas_White ,

I’ll give this a try and let you know how I make out. I’m currently doing all of this in SQL called from a Java program so I’ll probably need to wrangle this into SQL (at first glace it looks like this should be possible).

I was able to get a similar script to run in my Databricks instance (code is below).

However, I was not able to get truncate to work (edit: truncate seems to be working now, I’m not sure what changed). I’ll gather the details and post them here. Also, my export was missing a couple of tables, I’ll follow up on that as well.

Also, I would like to make this part of a fully automated process. I don’t know how/if I can make a call like this from another program (e.g. make some sort of REST call or do what is done here in SQL, I don’t know what the SQL equivalent of df = spark.sql('SELECT * FROM {db}.{table}'.format(db=target_db, table=tgt_t)) is).

This worked for me!!!

%python
tables = [
'care_site',
'cdm_source',
# 'cohort',
'cohort_definition',
'concept',
'concept_ancestor',
'concept_class',
'concept_relationship',
'concept_synonym',
'condition_era',
'condition_occurrence',
'cost',
'death',
'device_exposure',
'domain',
'dose_era',
'drug_era',
'drug_exposure',
'drug_strength',
# 'episode',
# 'episode_event',
'fact_relationship',
'location',
'measurement',
'metadata',
'note',
'note_nlp',
'observation',
'observation_period',
'payer_plan_period',
'person',
'procedure_occurrence',
'provider',
'relationship',
'source_to_concept_map',
'specimen',
'visit_detail',
'visit_occurrence',
'vocabulary'
]

target_db = 'demo_cdm'
data_path = '/FileStore/tables/prod/ohdsi/demo_cdm/'

for t in tables:
  tgt_t = t.lower()
  df = spark.sql('SELECT * FROM {db}.{table}'.format(db=target_db, table=tgt_t))
  spark.read.options(delimiter=",", header="True", dateFormat="yyyy-MM-dd")\
          .schema(df.schema)\
          .csv(data_path + t + '/' + t + '.csv')\
          .write.format('delta')\
          .insertInto(target_db + '.' + tgt_t.lower(), overwrite=True)
  spark.sql('REFRESH TABLE {db}.{table}'.format(db=target_db, table=tgt_t))
t