Spark Starter Guide 4.7: How to Standardize Data

Previous post: Spark Starter Guide 4.6: How to Aggregate Data

Introduction

Standardization is the practice of analyzing columns of data and identifying synonyms or like names for the same item. Similar to how a cat can also be identified as a kitty, kitty cat, kitten or feline, we might want to standardize all of those entries into simply “cat” so our data is less messy and more organized. This can make future processing of the data more streamlined and less complicated. It can also reduce skew, which we address in Addressing Data Cardinality and Skew.

We will learn how to standardize data in the following exercises.


NOTE

From this point onward, future code examples could utilize a line of code like this:

import spark.implicits._

This allows for implicit conversions for Scala objects like RDDs into modern Spark abstractions like Dataset, DataFrame, or Columns. It also supports many convenience functions from Spark SQL like isin() (checking for values in a list) or desc() (descending order sorting).


Two Types of Standardization

There are at least two basic ways to standardize something:

  1. You can recognize when two things are the same but literally different (“puppy” and “dog”) and associate them without actually changing anything.

  2. You can recognize when two things are the same and change them to be consistent (change instances of “puppy” to “dog”).

We’ll show ways to do both in Spark, in Scala and Python. Both involve some form of a synonym library.

Standardization through Suggestion

Exercise Setup

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Import spark.implicits, which will be useful for handy operations in a later step using the following code:

import spark.implicits._

Create a Sequence of Rows, each containing an animal name and type using the following code:

val my_previous_pets = Seq(Row("annabelle", "cat"),
                           Row("daisy", "kitten"),
                           Row("roger", "puppy"),
                           Row("joe", "puppy dog"),
                           Row("rosco", "dog"),
                           Row("julie", "feline"))

Create a schema that corresponds to the data using the following code:

val schema = List(
  StructField("nickname", StringType, nullable = true),
  StructField("type", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val petsRDD = spark.sparkContext.parallelize(my_previous_pets)

Create a DataFrame from the RDD and schema created using the following code:

val petsDF = spark.createDataFrame(petsRDD, StructType(schema))

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of dog nouns. Print the results to the console as shown in the following code:

val dogs = petsDF
.where($"type"
.isin("dog", "puppy", "puppy dog", "hound", "canine"))

dogs.show()

The following is the output of the preceding code:

+--------+---------+
|nickname|     type|
+--------+---------+
|   roger|    puppy|
|     joe|puppy dog|
|   rosco|      dog|
+--------+---------+

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of cat nouns. Print the results to the console as shown in the following code:

val cats = petsDF
.where($"type"
.isin ("cat", "kitty", "kitten", "feline", "kitty cat"))

cats.show()

The following is the output of the preceding code:

+---------+------+
| nickname|  type|
+---------+------+
|annabelle|   cat|
|    daisy|kitten|
|    julie|feline|
+---------+------+

As we can see, we were able to use custom standardization logic to decide when a cat is a cat, and a dog is a dog, even when their given type is not consistent. And, generally speaking, the isin() methodology of string comparisons in a list is relatively efficient at scale.


Follow these steps to complete the exercise in PYTHON:

Import additional relevant Spark libraries using the following code:

from pyspark.sql.functions import col

Create a List of Rows, each containing a name and type using the following code:

my_previous_pets = [Row("annabelle", "cat"),
                    Row("daisy", "kitten"),
                    Row("roger", "puppy"),
                    Row("joe", "puppy dog"),
                    Row("rosco", "dog"),
                    Row("julie", "feline")]

Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:

petsRDD = sc.parallelize(my_previous_pets)

Create a DataFrame from the RDD and a provided schema using the following code:

petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type'])

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of dog nouns. Print the results to the console as shown in the following code:

dogs = petsDF
.where(col("type")
.isin("dog", "puppy", "puppy dog", "hound", "canine"))

dogs.show()

The following is the output of the preceding code:

+--------+---------+
|nickname|     type|
+--------+---------+
|   roger|    puppy|
|     joe|puppy dog|
|   rosco|      dog|
+--------+---------+

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of cat nouns. Print the results to the console as shown in the following code:

cats = petsDF
.where(col("type")
.isin(["cat", "kitty", "kitten", "feline", "kitty cat"]))

cats.show()

This example also demonstrates that you can pass a list to the isin() function, not just a comma-separated list of strings values as demonstrated in the previous step.

The following is the output of the preceding code:

+---------+------+
| nickname|  type|
+---------+------+
|annabelle|   cat|
|    daisy|kitten|
|    julie|feline|
+---------+------+

We can see that all cats could be identified in the data set, even though they weren’t all labeled as type ‘cat’.

Standardization through Modification

In the previous exercise, we would quietly identify animals as a certain type if their type was found in a list of common synonyms for the proper type. In this exercise, we will actually modify our data to be standardized, by replacing the similar type value with its preferred, standard alternative.


NOTE

From this point onward, further Scala code examples could, where appropriate, utilize a case class. This is a Scala only abstraction that acts as a simple schema for structured, tabular-style data.

It’s an especially handy tool when paired with the Spark Dataset API, which can make powerful code even simpler to read and work with than DataFrames. It should be created outside of the main() function in Scala, or imported from its own class.

Otherwise, you will experience exceptions.

Example:

case class Person(name:String, age:Int)


Exercise Setup

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Then, create a case class called Pet, which contains two columns: nickname and petType using the following code:

case class Pet(nickname: String, petType: String)

Using the petsDF created in the previous exercise, Use the map() function of the DataFrame to compare the petType to a list of common dog and cat synonyms – returning “dog” or “cat”, respectively if there is a match. If there is not a match, return the unmodified petType (thus assuming it cannot be standardized) as shown in the following code:

val standardized_pets = petsDF.map(pet => {

  val nickname = pet.getString(pet.fieldIndex("nickname"))
  val petType = pet.getString(pet.fieldIndex("type"))

  val standardType =
    if (Seq("dog", "puppy", "puppy dog", "hound", "canine").contains(petType)){
      "dog"
    }
    else if (Seq("cat", "kitty", "kitten", "feline", "kitty cat").contains(petType)){
    "cat"
    }
  else{
      petType
    }

  Pet(nickname, standardType)
})

Print the results to the console using the following code:

standardized_pets.show()

The following is the output of the preceding code:

+---------+---+
|       _1| _2|
+---------+---+
|annabelle|cat|
|    daisy|cat|
|    roger|dog|
|      joe|dog|
|    rosco|dog|
|    julie|cat|
+---------+---+

As we can observe, Column 1 in the table displays the names of the pets while column 2 displays the type of pet animal they are, cat or dog, after passing through the standardization process.

Follow these steps to complete the exercise in PYTHON:

Create and utilize a standardize() function to compare the petType to a list of common dog and cat nouns – returning “dog” or “cat”, respectively, if there is a match.

def standardize(pet):

    name = pet[0]
    animal_type = pet[1]

    if animal_type in ["dog", "puppy", "puppy dog", "hound", "canine"]:
        return name, "dog"
    elif animal_type in ["cat", "kitty", "kitten", "feline", "kitty cat"]:
        return name, "cat"
    else:
        return pet

Then, apply the standardize() function to petsRDD (created in the previous exercise) using the map() function. Hint: You can also use a UDF on the DataFrame instead of this RDD map method, but we’ll cover that in a future exercise!

Print the results to the console using the following code:

standardizedPets = petsRDD.map(standardize)
standardizedPetsDF = spark.createDataFrame(standardizedPets, ['nickname', 'type'])

standardizedPetsDF.show()

The following is the output of the preceding code:

+---------+----+
| nickname|type|
+---------+----+
|annabelle| cat|
|    daisy| cat|
|    roger| dog|
|      joe| dog|
|    rosco| dog|
|    julie| cat|
+---------+----+

We can see that annabelle, daisy and julie are identified as cats, while roger, joe and rosco are identified as dogs, even though their types, starting out, were not labeled strictly as such.

In this exercise, we learned how to use custom standardization logic in Spark to identify similar animals, even when their given types differed.

In the next section, we’ll cover Ordering & Sorting – useful for when you want to convey… well, order! See you then!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.