Scaling ETL Pipeline for creating TF-Records using Apache Beam Python SDK on Google Cloud Dataflow

Swapnil Masurekar
6 min readJul 25, 2020

In this blog I would be showing an example for horizontally scaling the process of creating tfrecords for a computer vision dataset, similar steps can be followed for all semi-structured machine learning datasets. If you are quite familiar with creating tfrecords you can directly find the code on Github repo by clicking here.

A Brief Introduction to Tools ….

Dataflow is an ETL(Extract Transform and Load) tool on GCP(IaaS) which is used for data pre-processing and conversion. It is a pipeline service for processing streaming and batch data that implements workflows. In our case we use it to load images from a bucket, augment the images and save all the images to tfrecord in GCS Bucket.

Apache Beam is a framework for pipeline tasks. Dataflow is optimized for beam pipeline so we need to wrap our whole task of ETL into beam pipeline. Apache Beam has some of its own defined transforms called composite transforms which can be used, but it also provides flexibility to make your own (user-defined) transforms and use that in the pipeline. Each user-defined transform should be designed for a specific task. The basic syntax for a user-defined transform.

To learn the basic concepts for creating a data pipelines in Python using apache beam SDK refer this tutorial.

Planning your pipeline …

Now in order to create tfrecords we need to load each data sample, preprocess it, and make a tfexample such that it can be directly fed to a ML model. With the traditional approach using tf.python_io.TFRecordWriter()we need to iterate over each sample in the entire dataset to convert to tf-example and then later write it to tf-record. For huge datasets is consumes a lot of time and also each small task (such as reading data from memory, performing preprocessing operations, creating tf-example, serializing example, writing serialized example, etc.) in your code won’t take equal amount time. So in this case a better way to optimize our code is to scale our entire task over various machines (i.e scale horizontally). But even this is not completely optimal since each small task in your code won’t take an equal amount of time. So the best way is to horizontally scale each small task in your code to number to machines depending on how much time those tasks need to execute (i.e. For eg: More machines assigned to reading the data than to serialize the example).

Fortunately using Apache Beam and Google Dataflow we can achieve this quite easily. You need to break down your whole task of loading image and finally saving into tfrecords into a number of smaller tasks. Each subtask must be such that you are performing operations on a single sample (in our case sample is an image). Here I would be explaining with the example for Image Classification Dataset. The subtasks for this example are:

  • Extract:
    - Read Text line from CSV
    - Decode the text line string into path and label
    - Load image from the decode Google cloud storage path
  • Transform:
    - Convert image and label into tf-example such that it can be directly fed to the ML model
    - Serialize the tf-example to string
  • Load:
    - Write the serialized example to tf-record

As you can see these are all the basic steps required to create the tfrecords. Now we will write the code for each subtask individually. For the complete code, you may visit this GitHub repo: https://github.com/swapnil3597/dataflow-tfrecord (GitHub link)

Extract:

You may have metadata for your actual data stores in a text file or BigQuery table. Then during extraction, you can read from CSV, big query or bucket.list_blobs() in GCS.

Make sure that the first block which you use in the pipeline is a beam function and not your custom function, since custom DoFn's (will be explained later) doesn’t support windowing which is used to enable horizontal scaling.

Below is a Python code example to create the first block in your pipeline:

This above function yields the text lines in your CSV or text file one at a time. You have to decode this text line (or it might a row from BQ table) using a custom function to extract the metadata required to read your actual data from the storage (in case of NLP problems you may directly have your actual data in text or CSV format).

Defining Custom DoFn's in python:

A custom DoFn has the following format:

Here, the process() the function receives the yielded output value from the prior block as the input argument value , we can then extract the required information from this input argument, process it and obtain the output_values which are required to be yielded for the next block in the pipeline.

Using the above method you can define your custom DoFn’s to decode the text line and load the data into VM Instance memory. (refer GitHub code here)

[Refer DecodeFromTextLineDoFn() and LoadImageDoFn()]

Transform:

Once the data is being loaded you may create the tf-example using the following the DoFn example:

To the serialize the example yielded from this DoFn you can write a lambda function directly in the pipeline as (refer defining pipeline for better understanding),

beam.Map(lambda x: x.SerializeToString())

Load:

To write the serialized examples as tfrecords on bucket we have to use the beam’s own tfrecord record writer. This tfrecord writer writes the tfrecords on the output path provided as an argument. The number of tfrecords which are to be written can be controlled using num_shards . Below is an example,

# Write serialized example to tfrecords  
write_to_tf_record = beam.io.tfrecordio.WriteToTFRecord(
file_path_prefix=OUTPUT_PATH,
num_shards=20)

Defining the pipeline:

Pipeline Options:

These arguments/options are needed to be provided in order to run the pipeline on Dataflow.

Here all are required arguments except for --template-location . The runner must be specified as DataflowRunner in order to run your pipeline on Google DataFlow, also DirectRunner can be specified for debugging purposes (this will make your entire pipeline run on single VM).

setup.py is required as a package is needed to be created in order to run code on Dataflow. In the setup file we define the required packages for the module.

If you want to deploy/stage your pipeline and run it in a variety of environments you may deploy it as a template and reuse it as much as you desire. The --template_location is an optional argument that is to be provided only if you want to deploy the dataflow template on the bucket. To read more on templates you may follow the official documentation.

Defining Pipeline:

All the pipeline blocks which you have defined before need to be wrapped in the following way to run the beam pipeline:

You may have noticed that the custom DoFn’s are written as beam.ParDo(custom_do_fn).

Running Pipeline:

In order to run the pipeline, you have to simply run the python code. The refer to the directory structure required to python package refer (github code).

Once you run the pipeline you will be able to see the following graph on Google Dataflow UI:

The graph in Google Dataflow User Interface

The pipeline may take 4–5mins to run and tfrecords will be created at the GCS output path provided as shown below:

Tfrecords at output path in GCS bucket

Hope you were able to follow the blog. For any doubts or queries post them in Github issues.

--

--