Migrating our ETL pipeline to Luigi on a Cloud

Eric Escalante
Songkick Tech
Published in
6 min readSep 23, 2019

--

Photo by JJ Ying on Unsplash

Previously, on Creating a musical (data) pipeline, we shared how we went about creating a custom solution for solving our ETL needs using Go, Python, DataFlow, and BigQuery. This setup allowed us to not only unearth many exciting insights about our concert discovery data, but also share them and provide value to artists and fans alike.

Over a year has passed, and our little ETL project had grown quite a bit. It was about to have its ticket called in as part of our overall migration to the cloud.

As part of migrating it to Google Cloud Platform we also wanted to address the issue of task scheduling. Crons can only get you so far before you start tripping over yourself! The number of tasks began to increase and the dependencies between them started to become a bit unwieldy. What if a job does not finish before a dependent job kicks in? In the case of a vendor outage during one of the jobs, which jobs do we need to re-run? And in what order?

So, after some research, we chose Spotify’s Luigi for its small overhead and straightforward Python approach to task dependency and scheduling.

The scope of the project was thus:

Cloud migration

  • Provide developers with a complete staging environment (BigQuery, Storage Buckets, DataFlow, etc) so they could create new flows and experiment freely without affecting production data
  • Improve local development workflow by using Docker via Makefile commands
  • Remove any credentials from the code, as they would be provided now via consul (our new cloud-based configuration management)

Luigi migration

  • Replace a multitude of crons with just two: a daily and a weekly pipeline
  • Replace all bash scripts with Luigi python task classes, effectively simplifying the codebase
  • Have a clear view of all the task dependencies and their execution status (pending, failed, running, etc)
  • See a history of previous executions for each task
  • Re-run failed tasks automatically

In this post we’ll focus mainly on moving our ETL project to Luigi.

Anatomy of a Luigi task

We replaced all the bash scripts that invoke our ETL tasks with Luigi task classes that have these characteristics:

  • Must have an output() method that produces a tangible output object called a Target. This is how other tasks that depend on this one know this one finished successfully. The only exceptions to this rule are WrapperTasks, which do not produce any output by themselves.
  • Must have a run() method that will implement the actual job to be performed, for example, invoking a binary that extracts millions of rows from a table and places them as .tsv files in a Google Storage bucket.
  • May have a requires() method that returns a list of one or more tasks that need to be completed before this one can kick in.
  • May have an optional priority attribute to make sure this task gets run at the earliest once its dependencies are satisfied.

Given the different sources of data that we process daily, we reduced repetition by creating base classes from which concrete tasks can be implemented. Here’s a sketch of the MySQL extraction base task:

class ExtractTaskMySQL(luigi.Task): 
date = luigi.DateParameter()
def output(self):
return gcs.GCSTarget("gs://xxxx")
def run(self):
args_string = ("-database=%s ...", self.database, ...)
subprocess.check_call(["make", "extract","type=mysql", args_string]) with self.output().open(‘w’) as out_file:
out_file.write(“complete”)
@property
def database(self):
raise NotImplementedError('implement me!')
...

With this structure, the task that actually extracts artists is just a list of properties:

class ExtractArtists(ExtractTaskMySQL):    
@property
def fields(self):
return "id,name..."
@property
def table(self):
return "artists"
@property
def database(self):
return "xxx"

For the next step, which is loading rows from all of these text files in Google Storage into BigQuery, we also implemented a base class. The LoadTask class follows a similar pattern as before: it defines an output() method, a run() method and a list of mandatory properties.

Now comes (one of the many places) where Luigi shines: task dependencies. Here’s what the task of loading artists into BigQuery looks like:

from pipeline.extract.extract_artists import ExtractArtists
from pipeline.load.load_task import LoadTask
class LoadArtists(LoadTask):
@property
def name(self):
return "artists"
@property
def type(self):
return "mysql"
@property
def requires(self):
yield ExtractArtists(self.date)

That’s it! We import our ExtractArtists class at the top of our LoadArtists class and then return it as a dependency on the requires() method.

Luigi task execution

When the time comes to invoke our LoadArtists task — which itself may be listed as a requirement in a different task — Luigi will follow this sequence:

  • Check the output target of this task to determine if it has already run successfully.
  • If not, check the output target of all the tasks returned from this task’s requires() method.
  • Perform the previous two steps for the dependencies of any required task as well.
  • Run any task found that does not have a corresponding target created.
  • Once all dependencies have been satisfied, execute the run() method of the this task and create its output target.

So, in this case, by invoking the LoadArtists task, the ExtractArtists task gets executed first, which creates a nice dependency tree per task. If you require all the tasks in the pipeline inside a WrapperTask, Luigi creates something similar to this:

Dependency tree of our daily pipeline
That’s one happy lookin’ dependency tree

Which you can access by clicking on the graph icon next to any task in the Luigi UI (another one of the niceties of Luigi!):

Luigi UI

Scheduling our pipeline

Having wrapped all our export, load and shove jobs inside Luigi tasks, we just needed to decide which ones were meant to run daily and which ones weekly. Thus ending up with just two crontab entries! The daily one looks something like this:

01 * * * * root /usr/local/bin/sk-cron-wrapper.rb  --lock  songkick-etl-daily-pipeline

This cron script performs some setup tasks, like fetching database credentials from consul and setting them up as environment variables in the container, then executes luigi via:

luigi --module pipeline.daily_pipeline_wrapper DailyPipelineWrapper --workers 10

Given how Luigi knows the state of any task, the pipeline can be invoked every hour (as the cron shows) and only pending tasks will get executed. For example, one minute past midnight the pipeline begins to run concurrently all tasks that have no dependencies, then works its way up the tree until everything is happy and green.

If during the day a developer deploys a new job, it will get picked up on the next hourly cron and will enter the PENDING state and run as soon as a worker is free.

Luigi will also retry a set number of times any task that failed before raising an error.

In summary

Our ETL pipeline now runs from Docker containers in Google Compute instances with complete production, staging and development environments. Thanks to Luigi, we now have a clear birds-eye view of the state of the pipeline as a whole and of individual jobs at any given time. We are at a place where we can look into developing more complex and ambitious workflows with confidence that our architecture can grow with grace and reliability.

The next steps for us are adding Prometheus metrics to improve our job monitoring and alerting, optimise some of our jobs that get insights from BigQuery into MySQL now that we are using Google Cloud SQL, and so much more. Luigi could say that the cloud is now his oyster (if that made any sense).

Thanks for reading this far! We would love to hear any suggestions or answer any questions you may have in the comments below.

--

--