Data engineering project – Part 1

Title image: Base map and data from OpenStreetMap and OpenStreetMap Foundation

Update – Doing some more reading and thinking, I think I would use a different approach to parsing the XML in this part based on the warnings about XML vulnerabilities outlined here: https://docs.python.org/3/library/xml.html

Carto’s careers page continues to prove a source of useful and fun personal projects.

The next project I decided to have a go at was their Data Engineering test.

I have been a bit busy lately and the task is conveniently broken into three parts. So I will probably look to cover each of these in separate posts.

Part 1 is, Write an ETL process able to do the following:

  • Download Taxi data from Carto’s Google Cloud Storage bucket.
  • Transform the data (if applicable) in a way that Google BigQuery is able to ingest it.
  • Upload data to Google BigQuery.
  • Split the resulting table into data and geometries (data and geometries should be joinable by a common key)

The below outlines my go at addressing the above.

One thing to note here is that I didn’t want to go through the rigmarole of setting up a Google BigQuery deployment, so I made the decision to use a PostGIS database I have running on my local machine instead. I did a bit of reading around Google BigQuery, and it all seems pretty well documented and user friendly, so I think the approach I have taken here would translate pretty easily.

I split my approach up into four parts.

Download the data:

import os
import datetime as dt
import xml.etree.ElementTree as ET
import requests

url_pref = 'https://storage.googleapis.com/data_eng_test/'

current_date = dt.datetime.strftime(dt.date.today(),'%Y%m%d')

#Function that downloads file from URL
def download_file(file_url,file_name):
    r = requests.get(file_url)
    with open(file_name, 'wb') as fd:
        for chunk in r.iter_content(chunk_size=128):
            fd.write(chunk)

#Create directories to save files into
try:
    os.makedirs(current_date+'/downloaded')
except FileExistsError as exc:
    print(exc)
try:
    os.mkdir(current_date+'/extracted_csvs')
except FileExistsError as exc:
    print(exc)

#Go to GS Bucket, iterate over XML to get Contents links, download content
r = requests.get(url_pref)
root = ET.fromstring(r.content)
for child in root:
    for key in child.findall('{http://doc.s3.amazonaws.com/2006-03-01}Key'):
        target_file_name = key.text
        download_file(url_pref+target_file_name, current_date+'/downloaded/'+target_file_name)

The datasets all come in zip files, so they also need to be unzipped:

import zipfile as zf
import os
import datetime as dt

current_date = str(dt.datetime.strftime(dt.date.today(),'%Y%m%d'))

#Unzip the zipped CSVs
for dirpath, dirnames, files in os.walk(current_date):
    for file_name in files:
        if file_name[-4:]=='.zip':
            print('Extracting: '+ file_name)
            with zf.ZipFile(current_date+'/downloaded/'+file_name, 'r') as target_zip:
                target_zip.extractall(current_date+'/extracted_csvs')

The next step is what I think of as normalizing and quality controlling the data. For the most part, this consisted of making sure the data had the right number of columns, that columns had headers, and the headers were correct. There are a number of issues with these datasets, which I am assuming have been introduced by Carto deliberately as part of the test. These were things like missing headers, additional superfluous columns, values in fields that don’t conform to the data specification, values in columns that are valid but don’t make sense (eg. latitudes and longitudes not anywhere near New York).

My general philosophy here was to basically push all the rows with these sorts of issues to a fallout table. In the real world you could then do things like, explore these issues further, provide feedback to the data provider etc.

So the basic steps here are to read the csv into a pandas dataframe, carry out the checks, write to two database tables, one for fallout and one for good data.

While devising these tests I also did a review step where I basically plotted the data from the good result table, to make sure that no other odd looking values had made their way through, but I haven’t included this in the code as I think it is better covered in the next part of the task which I plan to cover in a later blog post.

import os
import uuid
import datetime as dt
from sqlalchemy import create_engine 
import pandas as pd

#database name
db = ''
#database user
db_user = ''
#database hostname
db_host = ''
#database use password
db_password = ''
engine = create_engine('postgresql+psycopg2://'+db_user+':'+db_password+'@'+db_host+'/'+db)

current_date = str(dt.datetime.strftime(dt.date.today(),'%Y%m%d'))

#Field names that should be present in each csv
header_list_for_repair = ['VendorID',
                          'tpep_pickup_datetime',
                          'tpep_dropoff_datetime',
                          'passenger_count',
                          'trip_distance',
                          'pickup_longitude',
                          'pickup_latitude',
                          'RateCodeID',
                          'store_and_fwd_flag',
                          'dropoff_longitude',
                          'dropoff_latitude',
                          'payment_type',
                          'fare_amount',
                          'extra',
                          'mta_tax',
                          'tip_amount',
                          'tolls_amount',
                          'improvement_surcharge',
                          'total_amount']

def clean_csv_push_rows_to_db(input_csv_file):
    fallout=[]
    
    result = pd.read_csv(input_csv_file,  sep='\||\t',engine='python')
    
    #Check columns in CSV, if more than 19, drop last 2 columns
    if len(result.columns) > 19:
        result = result.iloc[:,0:19]
    
    #This section is used to check and repair headers
    list_of_csv_headers = list(result)
    if list_of_csv_headers == header_list_for_repair:
        print(input_csv_file+'    Headers match')
    else:
        print(input_csv_file+'    Header DO NOT match, adding correct header')
        result.columns = header_list_for_repair
    
    #Various QC tests
    fallout.append(result[(result['pickup_longitude']>-71.5)|(result['pickup_longitude']<-74.82)])
    result = result[(result['pickup_longitude']<-71.5)&(result['pickup_longitude']>-74.82)]
    
    fallout.append(result[(result['pickup_latitude']<40)|(result['pickup_latitude']> 41.33)])
    result = result[(result['pickup_latitude']>40)&(result['pickup_latitude']< 41.33)]
    
    fallout.append(result[(result['dropoff_longitude']>-71.5)|(result['dropoff_longitude']<-74.82)])
    result = result[(result['dropoff_longitude']<-71.5)&(result['dropoff_longitude']>-74.82)]
    
    fallout.append(result[(result['dropoff_latitude']<40)|(result['dropoff_latitude']> 41.33)])
    result = result[(result['dropoff_latitude']>40)&(result['dropoff_latitude']< 41.33)]
    
    fallout.append(result[~result['VendorID'].isin ([1,2])])
    result = result[result['VendorID'].isin ([1,2])]
    
    fallout.append(result[~result['passenger_count'].isin ([0,1,2,3,4,5,6,7,8])])
    result = result[result['passenger_count'].isin ([0,1,2,3,4,5,6,7,8])]
    
    fallout.append(result[(result['trip_distance'] < 0) | (result['trip_distance'] > 182.9 )])
    result = result[(result['trip_distance'] >= 0) & (result['trip_distance'] < 182.9 )]
    
    fallout.append(result[~result['RateCodeID'].isin ([0,1,2,3,4,5,6])])
    result = result[result['RateCodeID'].isin ([0,1,2,3,4,5,6])]
    
    fallout.append(result[~result['payment_type'].isin ([0,1,2,3,4,5,6])])
    result = result[result['payment_type'].isin ([0,1,2,3,4,5,6])]
    
    fallout.append(result[(result['fare_amount'] < 0) | (result['fare_amount'] > 408.59 )])
    result = result[(result['fare_amount'] >= 0) & (result['fare_amount'] < 408.59 )]
    
    fallout.append(result[(result['extra'] < 0) | (result['extra'] > 1.5 )])
    result = result[(result['extra'] >= 0) & (result['extra'] < 1.5 )]
    
    fallout.append(result[(result['tip_amount'] < 0) | (result['tip_amount'] > 111 )])
    result = result[(result['tip_amount'] >= 0) & (result['tip_amount'] < 111 )]
    
    fallout.append(result[(result['tolls_amount'] < 0) | (result['tolls_amount'] > 56.28 )])
    result = result[(result['tolls_amount'] >= 0) & (result['tolls_amount'] < 56.28 )]
    
    fallout.append(result[(result['improvement_surcharge'] < 0) | (result['improvement_surcharge'] > 0.3 )])
    result = result[(result['improvement_surcharge'] >= 0) & (result['improvement_surcharge'] <= 0.3 )]
    
    fallout.append(result[(result['total_amount'] < 0) | (result['total_amount'] > 497.62 )])
    result = result[(result['total_amount'] >= 0) & (result['total_amount'] < 497.62 )]
    
    fallout.append(result[(result['mta_tax'] < 0) | (result['mta_tax'] > 0.5 )])
    result = result[(result['mta_tax'] >= 0) & (result['mta_tax'] <= 0.5 )]
    
    #Add UUID to use as primary keys
    result['uid'] = [uuid.uuid4() for entry in range(len(result.index))]
    
    #Make dataframe of fallouts
    fallout = pd.concat(fallout)
    
    #Push to respective fallout and result tables in database
    result.to_sql('result'+current_date, engine, chunksize=100000, if_exists='append')
    fallout.to_sql('fallout'+current_date, engine, chunksize=100000, if_exists='append')
    
for dirpath, dirnames, files in os.walk(current_date+'/extracted_csvs'):
    for file in files:
        if file[0]!='.':
            print(file)
            clean_csv_push_rows_to_db(current_date+'/extracted_csvs/'+file)

The final part of this step was to then to create a primary key field in the good data table in the database (which can be used to link to other tables) and then also create a separate geometry table as requested in the test. This code will create a table with columns for origin location as a point, destination location as point and then I also chose to create a line feature consisting of the origin and destination. Partly just becuase I though this would look cool (and you can see from the title image of this article I was right!!!).

import datetime as dt
import psycopg2
from psycopg2 import sql

date_stamp = dt.datetime.strftime(dt.date.today(),'%Y%m%d')
#database name
db = ''
#database user
db_user = ''
#database hostname
db_host = ''
#name of attribute table
attributes_table_name = "result"+date_stamp
#name of field to be make primary key in attribute table
pkey_field_name = "uid"
#create name of geometry table
geom_table_name = "result"+date_stamp+"_geom"

try:
    conn = psycopg2.connect(f"dbname={db} user={db_user} host={db_host}")
    cur = conn.cursor()
    print('Executing destroy primary key if already exists')
    cur.execute(
        sql.SQL('ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {key};')
        .format(table=sql.Identifier(attributes_table_name),
                key =sql.Identifier(attributes_table_name+'_pkey') 
                )
        )
    print('Executing create primary key')
    cur.execute(
        sql.SQL('ALTER TABLE {table} ADD PRIMARY KEY ({key});')
        .format(table=sql.Identifier(attributes_table_name),
                key = sql.Identifier(pkey_field_name))
        )
    print('Executing drop geometry table')
    cur.execute(
        sql.SQL("DROP TABLE IF EXISTS {geom_table};")
        .format(geom_table=sql.Identifier(geom_table_name))
        )
    print('Executing create geometry table')
    cur.execute(
        sql.SQL("CREATE TABLE \
                        {geom_table} \
                    AS SELECT \
                        uid, \
                        ST_GeomFromText(concat('POINT(',pickup_longitude ,' ',pickup_latitude,')'),4326) as pickup, \
                        ST_GeomFromText(concat('POINT(',dropoff_longitude ,' ',dropoff_latitude,')'),4326) as dropoff, \
                        ST_GeomFromText(concat('LINESTRING(', \
                                                pickup_longitude , \
                                                ' ', \
                                                pickup_latitude, \
                                                ',', \
                                                dropoff_longitude , \
                                                ' ', \
                                                dropoff_latitude, \
                                                ')'),4326) as line \
                    FROM \
                        {attr_table} st;")
                .format(geom_table = sql.Identifier(geom_table_name),
                        attr_table = sql.Identifier(attributes_table_name))
                )
    
    
    conn.commit()
    cur.close()
    conn.close()
    print('Complete')
except:
    print(cur.query)
    print('Step above failed')
    cur.close()
    conn.close()

That is it for this part of the task. The next two parts are:

  • Data quality
    • Assess the quality of the data. Identify and document any issues with the data.
  • Write the SQL queries that help you answer the following questions
    • What is the average fare per mile?
    • Which are the 10 pickup taxi zones with the highest average tip?

I plan to cover these in some more blog posts soon.