whalebeings.com

Mastering ParDo and DoFn in Apache Beam: A Comprehensive Guide

Written on

Understanding ParDo and DoFn in Apache Beam

In a prior tutorial, I discussed various transformation functions in Apache Beam, such as map, filter, and combinePerKey(). This session will delve into the ParDo transform, which serves as an advanced variant of Map. The key distinction is that ParDo operates on each element of a PCollection and can yield zero or more results, while Map invariably produces one output for every input element. This flexibility makes ParDo especially powerful for data manipulation tasks.

Another crucial component of the ParDo transform is the requirement for user-defined functions (DoFn). Let’s explore some practical examples.

You can download this public dataset to follow along:

I utilized a Google Colab notebook for this coding exercise, making the setup straightforward. To install Apache Beam, you can use the following command:

pip install --quiet apache_beam

I created a folder named 'data' to store the CSV file and the outputs from today’s exercise:

mkdir -p data

To begin, I'll focus on the basics: reading the dataset and converting each row into a list, then saving them into a text file. Reading a text file in a Beam pipeline is quite simple. Given that we have a CSV file, we will define a CustomCoder() class that encodes the objects into byte strings, decodes them back to their original form, and specifies that the coder will produce deterministic outputs. Here’s the implementation:

from apache_beam.coders.coders import Coder

class CustomCoder(Coder):

"""Custom coder for reading and writing UTF-8 strings."""

def encode(self, value):

return value.encode("utf-8", "replace")

def decode(self, value):

return value.decode("utf-8", "ignore")

def is_deterministic(self):

return True

Additionally, we can create a SplitRow() class that utilizes Python’s .split() function to split rows.

class SplitRow(beam.DoFn):

def process(self, element):

return [element.split(',')]

Once our classes are ready, we define the pipeline, starting with the Pipeline() function and storing it in a variable. Each transformation is initiated with a '|' sign. The first transformation reads the data, and the last one writes it out. The ParDo transform is situated between these two, utilizing the SplitRow class as the DoFn to process each element in the PCollection. Thus, each row from the CSV will be split according to our defined function.

Here’s the complete pipeline code:

import apache_beam as beam

p = beam.Pipeline()

read_write = (p

beam.io.ReadFromText('data/sales_data_sample.csv', coder=CustomCoder(), skip_header_lines=1)
beam.ParDo(SplitRow())
beam.io.WriteToText('data/output')

)

p.run()

To check the output:

head -n 5 data/output-00000-of-00001

The output will look something like this:

['10107', '30', '95.7', ...]

['10121', '34', '81.35', ...]

...

Next, we’ll move to a more intricate example where we want to count the number of motorcycles in each status. This requires additional classes, starting with the previously defined SplitRow() for our initial setup.

The filterMotorcycles class filters the dataset, retaining only the entries with "Motorcycles" in the 'Productline' column (the 10th column).

class filterMotorcycles(beam.DoFn):

def process(self, element):

if element[10] == "Motorcycles":

return [element]

Next, we define the PairStatus class, which concatenates the Status column (6th column) with the Productline column (10th column) into a single string and creates a tuple with this string as the key and 1 as the value.

class PairStatus(beam.DoFn):

def process(self, element):

return [(element[10] + ', ' + element[6], 1)]

Finally, we implement the Counting class to sum the values associated with each key.

class Counting(beam.DoFn):

def process(self, element):

(k, v) = element

return [(k, sum(v))]

Now, we start the pipeline again by calling beam.Pipeline(). After reading the data and applying SplitRow(), the filterMotorcycles function filters out irrelevant data, while the PairStatus class pairs statuses with their counts.

The output will ultimately group and sum the counts of motorcycles by status:

p2 = beam.Pipeline()

shipped_count = (p2

beam.io.ReadFromText('data/sales_data_sample.csv', coder=CustomCoder(), skip_header_lines=1)
beam.ParDo(SplitRow())
beam.ParDo(filterMotorcycles())
beam.ParDo(PairStatus())
beam.GroupByKey()
beam.ParDo(Counting())
beam.io.WriteToText('data/output2')

)

p2.run()

To verify the results, you can again use:

head -n 5 data/output2-00000-of-00001

The output will summarize the counts of motorcycles in different statuses:

('Motorcycles, Shipped', 324)

('Motorcycles, Disputed', 6)

('Motorcycles, On Hold', 1)

Conclusion

In this tutorial, I aimed to provide practical examples of how ParDo operates in Apache Beam. I trust that the examples and explanations are clear and informative.

For more updates, feel free to follow me on Twitter and like my Facebook page.

Further Reading

  • Map, Filter, and CombinePerKey Transforms in Writing Apache Beam Pipelines with Examples | by Rashida Nasrin Sucky | Jul, 2023 | Towards Data Science
  • A Detailed Tutorial on Polynomial Regression in Python, Overview, Implementation, and Overfitting | by Rashida Nasrin Sucky | Jun, 2023 | Towards AI
  • Using a Keras Tuner for Hyperparameter Tuning of a TensorFlow Model | by Rashida Nasrin Sucky | Jul, 2023 | Towards AI
  • Is ChatGPT Taking Away Human Jobs? | by Rashida Nasrin Sucky | Medium
  • Complete Implementation of a Mini VGG Network for Image Recognition | by Rashida Nasrin Sucky | Towards Data Science
  • Easy Method of Edge Detection in OpenCV Python | by Rashida Nasrin Sucky | Towards Data Science

This video provides an in-depth look at the use of ParDo in Apache Beam, showcasing practical applications and coding examples.

This video elaborates on the lifecycle of DoFn in Apache Beam and outlines user code requirements, enhancing your understanding of the framework.

Share the page:

Twitter Facebook Reddit LinkIn

-----------------------

Recent Post:

Understanding Fatigue: Insights from Brain Imaging Studies

Explore how brain scans are revealing the underlying mechanisms of fatigue and their implications for health and performance.

# Lessons from My Account Suspension: Avoiding Common Pitfalls

Reflecting on my recent account suspension and the lessons learned to help others avoid similar mistakes.

The Curious Connection Between Cognitive Load and Navigation

Discover the quirky relationship between cognitive load and our navigation habits, including the common act of turning down the radio while driving.

Navigating Unity Development: Essential Resources for Problem Solving

Discover key resources to overcome challenges in Unity development, including forums and communities for support.

# How Marc Benioff Transformed From Oracle Programmer to Billionaire

Discover the mindset of Marc Benioff, the Oracle programmer who founded Salesforce and built a $127 billion empire through innovative ideas.

Mastering ChatGPT: Lessons Learned After a Year of Trials

After a year of challenges, discover insights on effectively using ChatGPT for content creation.

Unlocking Self-Actualization: A Journey Through Maslow's Ideas

Explore Maslow's hierarchy of needs and the concept of self-actualization, revealing the traits of self-actualized individuals and how to strive for personal growth.

Maximize Your iPhone Battery Life: Tips and Insights

Learn effective strategies to extend the lifespan of your iPhone battery and improve its performance with practical advice.