Mastering ParDo and DoFn in Apache Beam: A Comprehensive Guide
Written on
Understanding ParDo and DoFn in Apache Beam
In a prior tutorial, I discussed various transformation functions in Apache Beam, such as map, filter, and combinePerKey(). This session will delve into the ParDo transform, which serves as an advanced variant of Map. The key distinction is that ParDo operates on each element of a PCollection and can yield zero or more results, while Map invariably produces one output for every input element. This flexibility makes ParDo especially powerful for data manipulation tasks.
Another crucial component of the ParDo transform is the requirement for user-defined functions (DoFn). Let’s explore some practical examples.
You can download this public dataset to follow along:
I utilized a Google Colab notebook for this coding exercise, making the setup straightforward. To install Apache Beam, you can use the following command:
pip install --quiet apache_beam
I created a folder named 'data' to store the CSV file and the outputs from today’s exercise:
mkdir -p data
To begin, I'll focus on the basics: reading the dataset and converting each row into a list, then saving them into a text file. Reading a text file in a Beam pipeline is quite simple. Given that we have a CSV file, we will define a CustomCoder() class that encodes the objects into byte strings, decodes them back to their original form, and specifies that the coder will produce deterministic outputs. Here’s the implementation:
from apache_beam.coders.coders import Coder
class CustomCoder(Coder):
"""Custom coder for reading and writing UTF-8 strings."""
def encode(self, value):
return value.encode("utf-8", "replace")
def decode(self, value):
return value.decode("utf-8", "ignore")
def is_deterministic(self):
return True
Additionally, we can create a SplitRow() class that utilizes Python’s .split() function to split rows.
class SplitRow(beam.DoFn):
def process(self, element):
return [element.split(',')]
Once our classes are ready, we define the pipeline, starting with the Pipeline() function and storing it in a variable. Each transformation is initiated with a '|' sign. The first transformation reads the data, and the last one writes it out. The ParDo transform is situated between these two, utilizing the SplitRow class as the DoFn to process each element in the PCollection. Thus, each row from the CSV will be split according to our defined function.
Here’s the complete pipeline code:
import apache_beam as beam
p = beam.Pipeline()
read_write = (p
beam.io.ReadFromText('data/sales_data_sample.csv', coder=CustomCoder(), skip_header_lines=1)beam.ParDo(SplitRow())beam.io.WriteToText('data/output'))
p.run()
To check the output:
head -n 5 data/output-00000-of-00001
The output will look something like this:
['10107', '30', '95.7', ...]
['10121', '34', '81.35', ...]
...
Next, we’ll move to a more intricate example where we want to count the number of motorcycles in each status. This requires additional classes, starting with the previously defined SplitRow() for our initial setup.
The filterMotorcycles class filters the dataset, retaining only the entries with "Motorcycles" in the 'Productline' column (the 10th column).
class filterMotorcycles(beam.DoFn):
def process(self, element):
if element[10] == "Motorcycles":
return [element]
Next, we define the PairStatus class, which concatenates the Status column (6th column) with the Productline column (10th column) into a single string and creates a tuple with this string as the key and 1 as the value.
class PairStatus(beam.DoFn):
def process(self, element):
return [(element[10] + ', ' + element[6], 1)]
Finally, we implement the Counting class to sum the values associated with each key.
class Counting(beam.DoFn):
def process(self, element):
(k, v) = element
return [(k, sum(v))]
Now, we start the pipeline again by calling beam.Pipeline(). After reading the data and applying SplitRow(), the filterMotorcycles function filters out irrelevant data, while the PairStatus class pairs statuses with their counts.
The output will ultimately group and sum the counts of motorcycles by status:
p2 = beam.Pipeline()
shipped_count = (p2
beam.io.ReadFromText('data/sales_data_sample.csv', coder=CustomCoder(), skip_header_lines=1)beam.ParDo(SplitRow())beam.ParDo(filterMotorcycles())beam.ParDo(PairStatus())beam.GroupByKey()beam.ParDo(Counting())beam.io.WriteToText('data/output2'))
p2.run()
To verify the results, you can again use:
head -n 5 data/output2-00000-of-00001
The output will summarize the counts of motorcycles in different statuses:
('Motorcycles, Shipped', 324)
('Motorcycles, Disputed', 6)
('Motorcycles, On Hold', 1)
Conclusion
In this tutorial, I aimed to provide practical examples of how ParDo operates in Apache Beam. I trust that the examples and explanations are clear and informative.
For more updates, feel free to follow me on Twitter and like my Facebook page.
Further Reading
- Map, Filter, and CombinePerKey Transforms in Writing Apache Beam Pipelines with Examples | by Rashida Nasrin Sucky | Jul, 2023 | Towards Data Science
- A Detailed Tutorial on Polynomial Regression in Python, Overview, Implementation, and Overfitting | by Rashida Nasrin Sucky | Jun, 2023 | Towards AI
- Using a Keras Tuner for Hyperparameter Tuning of a TensorFlow Model | by Rashida Nasrin Sucky | Jul, 2023 | Towards AI
- Is ChatGPT Taking Away Human Jobs? | by Rashida Nasrin Sucky | Medium
- Complete Implementation of a Mini VGG Network for Image Recognition | by Rashida Nasrin Sucky | Towards Data Science
- Easy Method of Edge Detection in OpenCV Python | by Rashida Nasrin Sucky | Towards Data Science
This video provides an in-depth look at the use of ParDo in Apache Beam, showcasing practical applications and coding examples.
This video elaborates on the lifecycle of DoFn in Apache Beam and outlines user code requirements, enhancing your understanding of the framework.