Spark Starter Guide 4.3: How to Deduplicate Data

Previous post: Spark Starter Guide 4.2: How to Create a Spark Session

Introduction

The process of removing duplicate records from a collection of data is called deduplication. This is often necessary early in the process, before data is analyzed further. In fact, it is typically associated with the Transform stage of ETL (though it’s common in the Load stage as well). Spark has a very clever and reliable function, available on the DataFrame object, that can do just that: dropDuplicates().

It looks like this in practice, in Scala and Python respectively:

// scala
val deduplicatedDataFrame_dataframe = dataFrame.dropDuplicates()
# python
deduplicatedDataFrame = dataFrame.dropDuplicates()

Cleaning data is a necessary step in making it more useful. It can be as simple as removing duplicate entries or filtering out unwanted information. Thankfully, Spark provides a suite of handy functions that make the process straightforward. Let’s demonstrate the built-in deduplication functionality of Spark by applying it to a simple set of data.

In the following exercise, we will learn how to deduplicate data in a Spark DataFrame.


Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Create a Sequence of Rows where the content is a tuple containing an animal and its category using the following code:

val categorized_animals = Seq(Row("dog", "pet"),
                              Row("cat", "pet"),
                              Row("bear", "wild"),
                              Row("cat", "pet"),
                              Row("cat", "pet"))

Create a schema that corresponds to the data using the following code:

val schema = List(
  StructField("name", StringType, nullable = true),
  StructField("category", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val animalDataRDD = spark.sparkContext.parallelize(categorized_animals)

Create a DataFrame from the RDD and schema created, and print the results to console using the following code:

val animalData = spark.createDataFrame(animalDataRDD, StructType(schema))
animalData.show()

The following is the output of the preceding code:

+----+--------+
|name|category|
+----+--------+
| dog|     pet|
| cat|     pet|
|bear|    wild|
| cat|     pet|
| cat|     pet|
+----+--------+

From the preceding output, you can notice that the “cat pet” row is duplicated thrice. We will remove the duplicates in the next step using the dropDuplicates() method.

Drop the duplicate rows and print the results to the console using the following code:

val deduped = animalData.dropDuplicates()
deduped.show()

The following is the output of the preceding code:

+----+--------+
|name|category|
+----+--------+
|bear|    wild|
| dog|     pet|
| cat|     pet|
+----+--------+

Follow these steps to complete the exercise in PYTHON:

Create a List of tuples containing an animal and its category using the following code:

categorized_animals = [("dog", "pet"), ("cat", "pet"), ("bear", "wild"), ("cat", "pet"), ("cat", "pet")]

Use the parallelize() function of Spark to turn that Sequence into an RDD using the following code:

animalDataRDD = sc.parallelize(categorized_animals)

Create a DataFrame from the RDD and print the results to the console using the following code:

animalsDF = spark.createDataFrame(animalDataRDD, ['name', 'category'])
animalsDF.show()

The following is the output of the preceding code:

+----+--------+
|name|category|
+----+--------+
| dog|     pet|
| cat|     pet|
|bear|    wild|
| cat|     pet|
| cat|     pet|
+----+--------+

From the preceding output, we can see that all the animals are categorized and the “cat pet” row is duplicated thrice. In the next step, we will use the dropDuplicates() method to fix this.

Drop the duplicate rows and print the results to the console using the following code:

deduplicated = animalsDF.dropDuplicates()
deduplicated.show()

The following is the output of the preceding code:

+----+--------+
|name|category|
+----+--------+
|bear|    wild|
| dog|     pet|
| cat|     pet|
+----+--------+

When you run the final step, you will see that the duplicate entries have been deleted.

In the next section, we will see the importance of filtering data and how it can be done.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.