Motivation: in modern businesses we increasingly have data in many different systems and services, and getting it all into a central location (“warehouse”) or getting all the parts to talk together is a very viable career and full time job. In order to do wizard data science work we need the freshest data from all sources, aligned ideally in a single location, and this is typically done with data pipeline tools and scripts. Through trial and error I’ve learned a lot about building these pipelines, and these are some notes what has worked best, and how I’ll “get it right” from the outset the next time!
ETLs
The methodology of pulling data out of one software system (via some sort of application programming interface or API) and putting it into a database or other system is commonly referred to as an Extract-Transform-Load (ETL) process. There is also advocacy for ELT patterns (transforming last), but regardless of the location of the T I’ve found that the E and L parts are the most critical. In terms of operational efficiency and the need for constant maintenance, the acquisition of data (E) and storage (L) have to be bug free and running cleanly.
There are a plethora of companies that provide E(T)L services, but being at a scrappy startup, and never finding a cost-effective solution that fit my specific needs, I jumped right in to build my own. When I built out my first ETL jobs it made sense to modularize the process, and make extensive use of interfaces in Python. The three parts of any ETL job are:
Get some data (Extract): most commonly via a rest API, graphQL, or SQL from another database. I want all the details of these fetching processes, handling error responses, timeouts, etc. hidden from the code managing the data job for a specific system. An interface to the Extract module will take the type of call, the host endpoint, the authentication keys, and any source specific keyword arguments. It will then return a list of data records retrieved (typically JSON dictionaries).
Check the quality of the data and pull out the parts we want (Transform in the most minimal sense). This is the part I still have to write in a custom way for each new API I go up against. How is pagination handled? Can the most recently updated records be requested or is there another sorting mechanism? Are the records under a “data” key in the dictionary? Finally, I convert it into a pandas DataFrame to pass off to the final step.
Push the results into a warehouse (Load): again, I want a single call through another interface that handles the vagaries of authentication, formatting, and loading the data into whatever destination we are heading for. The nuances of Redshift vs Postgres 14 vs MySQL 8, setting up SSH tunnels, and cleaning up connections can all be handled in this separate module. It should take my DataFrame and return a success indicator of some kind.
Clean Up Script
Starting at the end, I put this as boiler plate in every script, and even if it’s not needed it provides a place to put clean-up code in the future. I have this line at the end of every main function:
import common.cleanup
With a simple cleanup.py in a /common directory. At first it was empty, but has become a powerful aid as the EL landscape became more complex. Cleaning up temporary files, closing any SQL or SSH connections, etc. can all be taken care of in one place.
created_at & updated_at
This is more of a database management idea, but including timestamp columns for each entry created_at and updated_at times should be the default for every table. I have it written into the load modules to add those columns on any DataFrames headed for the warehouse, and check to update the updated_at entries on any UPDATE SQL calls.
In addition to querying for the most recent data for analysis, it is also incredibly helpful in identifying stale data or cases where jobs failed silently. If we’re expecting every table to have at least some updated records every day it’s a quick matter to check for tables with old maximum updated_at dates and locate the failing job.
Soft Delete
Another database idea: in many cases hard deleting data is necessary, to keep only the up-to-date, current state of data. However, soft-deletion should be kept in mind whenever possible. This is a process of keeping a “deleted_at” column in the table and performing something like:
HubSpot (a CRM I’ve worked with extensively) includes just such an indicator with most of the data object types that can be fetched via their API. However, the bulk endpoints only return non-deleted records, so for a while I was confused why deleting things in the web app did not change the corresponding records in our warehouse. It turns out one has to request EACH record individually to get the deleted status. 🤦🏼♂️
UPDATE my_table SET deleted_at = NOW() WHERE ...
fill in your criteria. Un-deleting becomes possible, and old entries can be retrieved later if needed.
Logging
First, get hip to the logging module in Python, and make liberal use of logging.debug()
and logging.error()
calls. Include the kwarg exc_info=True
in any call to get call stack details in the log.
import logging
Look at configuring Logging to write to a directory. I use a TimedRotatingFileHandler configured instance to make the files manageable over months.
Log every call, every request, every result. I have found this immensely valuable in both assessing the inefficiencies of my data scripts, but also in finding breaks and showing my work to others across the business.
Given a single interface method for the Extract stage, it’s robust to place a decorator function around it to capture:
Calling function name and module name
Length of arguments (will catch length of query or curl request)
Time of initiation
Execution time
Returned data size
This can be cached and pushed to a warehouse logging table on job cleanup!
The code I have in production is patterned like this:
def LogDBCall(func): """! Wrapper for logging all database calls. """ import inspect, sys def wrapper(*args, **kwargs): try: # args[0] is usually the SQL query, or in DataFrame loading it is the length of the frame x = {'function_name': func.__name__, 'command_length': sys.getsizeof(args[0]), 'start_time': time.time(), 'filename': inspect.stack()[1].filename, 'calling_function_name': inspect.stack()[1].function, 'call_type': 'sql'} except: logging.error('database:LogDBCall:wrapper', exc_info=True) start_time = time.perf_counter() value = func(*args, **kwargs) run_time = time.perf_counter() - start_time try: x['execution_time'] = run_time if not value is None: x['return_bytes'] = sys.getsizeof(value) for k, v in kwargs.items(): x['arg_{}'.format(k)] = v CollectLogEntries(x) except: logging.error('database:LogDBCall:wrapper', exc_info=True) return value return wrapper
Never Fail
The EL job itself cannot error out part way through. Jobs failing makes the possibility for gaps, duplicates, and erroneous entries shoot up dramatically. It makes a mess and is hard to clean up. Obviously careful but frequent use of try/except/finally blocks is critical, and getting the nesting right is important (do we try-wrap each individual call or the request pagination loop as a whole? what do we do with partial results? Etc.) Response codes 400+ won’t necessarily break the script, but need to be handled and logged too.
When something fails we want to note what failed (the query or request that failed should be logged in its entirety), what the error messages were (lots of logging.error()), and what we’re doing about it (throwing away the partial results? Trying again? Moving on to the next job?).
In order to never fail I’ve become increasingly fond of making any non-numeric data into text (VARCHAR, in the postgres world). This requires a lot of casting to use the data, but it ensures the EL jobs perform with a much higher reliability. I’ve lost far too much time cleaning up after someone has managed to put text into what should have been a number or date property, causing the database to reject the entire Load. Instead of relying on the system (CRM, billing, BI, whatever) to enforce data types, we just absorb it all, complete the Load, and worry about checking for empty or wrong-type data when we’re analyzing or building reports.
At some point I’ll have an entire post about the number of different ways one can cast a text entry to a timestamp in SQL. Who knew there were so many ways to skin the proverbial cat?
Reporting
Once everything is logged I think about ETL reporting in two ways, for me and for everyone else.
For me: I look at daily and weekly views of how much data is going where, which calls are most frequent, which sources take the longest, on and on. How are the ETL jobs performing? Which ones are the most efficient and which ones could use better optimization? Also, if something breaks I turn to my ETL warehouse logs to find the source, the scope, and the fix.
For others: ideally the data is shipped around behind the scenes, under the hood, and no one else needs to be aware and fully cognizant of how it all works. However, it takes a significant amount of work to get the engine humming smoothly and showing this work as valuable is necessary to getting support and recognition of the job. It may not be especially tangible, but reporting high level metrics of how much data is shipped each night, how many database calls, etc. gives some notion of how many gears and motors and drive belts are spinning away keeping data up to date and centralized.
De-duplication
Often we have data with a unique id key for the table (such as Contact ID, Account ID, event ID) and if the record changes we want to update the entry and keep a single row for each ID value. Some databases will help enforce this, but AWS Redshift does not. Here’s the bit of SQL I use to perform a de-duplication job on a table (note: relies on accurate “updated_at” timestamps to identify the most recent of each duplicate entry):
CREATE TABLE {{table_name}}_duplicates AS
SELECT * FROM (
SELECT
{{id_column}},
"updated_at",
ROW_NUMBER() OVER (PARTITION BY {{id_column}}
ORDER BY "updated_at" DESC) AS dupcnt
FROM
(SELECT * FROM {{table_name}} ORDER BY "updated_at" DESC)
)
WHERE
dupcnt > 1;
We’re creating a temporary table containing any duplicate entry IDs and their updated_at timestamps. (Replace the handle bar {{}} variables with your actual table and id names.)
Finally, we use the temporary table to delete the old, duplicate entries from the original table:
DELETE FROM
{{table_name}}
USING
{{table_name}}_duplicates” t2
WHERE
t2.updated_at = {{table_name}}.updated_at
AND t2.{{id_column}} = {{table_name}}.{{id_column}}
Dr. Ben Smith is a Data Scientist and thinker, fascinated by the appearance of computers in our daily lives, creativity, and human struggles. He has had the privilege to think, learn, and write at the University of Illinois, the National Center for Supercomputing Applications, the Cleveland Institute of Art, Case Western Reserve U., IUPUI, and at Boardable: Board Management Software, Inc.
If you have feedback or questions please use Contact me to get in touch. I welcome thoughtful responses and constructive critique.