Introduction

Real-world datasets are hardly ever clean and pristine. They commonly include blanks, nulls, duplicates, errors, malformed text, mismatched data types, and a host of other problems that degrade data quality. No matter how much data one might have, a small amount of high quality data is more beneficial than a large amount of garbage data. All decisions derived from data will be better with higher quality data. 

In this section we will introduce some of the methods and techniques that Spark offers for dealing with “dirty data”. The term dirty data means data that needs to be improved so the decisions made from the data will be more accurate. The topic of dirty data and how to deal with it is a very broad topic with a lot of things to consider. This chapter intends to introduce the problem, show Spark techniques, and educate the user on the effects of “fixing” dirty data. 

Dirty data means data that needs to be improved so the decisions made from the data will be more accurate

Before we can deal with dirty data we frist need to make some dirty data. In this section we will manually construct a DataFrame of names, addresses, and phone numbers that we will intentially make dirty. We will make the data dirty but duplicating values, adding blank values, and including null values. The main reason that we are building the dirty data manually is so the results easily fit on a page of this book so that you can clearly see what Spark is accomplising. 

Making dirty data

We will create nested lists in Python and in Scala that will be the data for our DataFrames. We are using small data so we can show the results of the operations on the page of this book. In the dirty data set that we will manually create below, there are several rows that have null values, several duplicated values, and an entire row that is duplicated. 

For the PySpark DataFrame we use a nested Python list of ten rows of data. The data includes names, addresses, and phone numbers. We then use the createDataFrame() method to pass the variable name example_data in the first parameter and the second parameter is a Python list of column names. 

example_data = [
  ["Stephanie", "Smith", "777 Brockton Avenue Abington, MA 2351", "(340) 977-9288"]
  , ["Ike", "Dodge", "8478 Mill Pond Rd. Desoto, TX 75115", "(545) 236-9396"]
  , ["Lacie", "Culbreath", None, "(782) 339-8539"]
  , ["Daniel", "Towles", "45 Water Street Cartersville, GA 30120", "(699) 791-0320"]
  , ["Sheena", None, "399 Pierce Street La Vergne, TN 37086", None]
  , ["Joe", "Vigna", "3 Armstrong Street Malvern, PA 19355",None]
  , ["Daniel", "Towles", "45 Water Street Cartersville, GA 30120", "(699) 791-0320"]
  , ["Stephanie", "Smith", "500 East Main St. Carlsbad, NM 54986", "(897) 455-1312"]
  , ["Gary", "Cliff", "", ""]
  , ["Jasmine", "French", "76 S. Lafayette Ave. Cupertino, CA 95014", "(385) 456-9823"]
]

df = spark.createDataFrame(example_data, ["first_name", "last_name", "address", "phone_number"])
df.show(20, False)

For the Spark Scala DataFrame we use a Scala list of tuples of the same data as in the Python list. We call the createDataFrame() method on the SparkSession variable and pass the Scala example_list variable as the only parameter. To add column names (without adding a defined schema) use the toDF() method with comma separated column names.

val example_data = List(
  ("Stephanie", "Smith", "777 Brockton Avenue Abington, MA 2351", "(340) 977-9288")
  , ("Ike", "Dodge", "8478 Mill Pond Rd. Desoto, TX 75115", "(545) 236-9396")
  , ("Lacie", "Culbreath", null, "(782) 339-8539")
  , ("Daniel", "Towles", "45 Water Street Cartersville, GA 30120", "(699) 791-0320")
  , ("Sheena", null, "399 Pierce Street La Vergne, TN 37086", null)
  , ("Joe", "Vigna", "3 Armstrong Street Malvern, PA 19355",null)
  , ("Daniel", "Towles", "45 Water Street Cartersville, GA 30120", "(699) 791-0320")
  , ("Stephanie", "Smith", "500 East Main St. Carlsbad, NM 54986", "(897) 455-1312")
  , ("Gary", "Cliff", "", "")
  , ("Jasmine", "French", "76 S. Lafayette Ave. Cupertino, CA 95014", "(385) 456-9823")
)

val df = spark.createDataFrame(example_data).toDF("first_name", "last_name", "address", "phone_number")
df.show(20, false)

The output of both the PySpark and Scala DataFrames is below.

+----------+---------+----------------------------------------+--------------+
|first_name|last_name|address                                 |phone_number  |
+----------+---------+----------------------------------------+--------------+
|Stephanie |Smith    |777 Brockton Avenue Abington, MA 2351   |(340) 977-9288|
|Ike       |Dodge    |8478 Mill Pond Rd. Desoto, TX 75115     |(545) 236-9396|
|Lacie     |Culbreath|null                                    |(782) 339-8539|
|Daniel    |Towles   |45 Water Street Cartersville, GA 30120  |(699) 791-0320|
|Sheena    |null     |399 Pierce Street La Vergne, TN 37086   |null          |
|Joe       |Vigna    |3 Armstrong Street Malvern, PA 19355    |null          |
|Daniel    |Towles   |45 Water Street Cartersville, GA 30120  |(699) 791-0320|
|Stephanie |Smith    |500 East Main St. Carlsbad, NM 54986    |(897) 455-1312|
|Gary      |Cliff    |                                        |              |
|Jasmine   |French   |76 S. Lafayette Ave. Cupertino, CA 95014|(385) 456-9823|
+----------+---------+----------------------------------------+--------------+

Removing Duplicate Records

One of the first things to look for when cleaning data is duplicate records. Duplicated rows could be a mistake or it could be a natural consequence of the process that generated the data originally. Regardless, if duplicated rows need to be removed then Spark has a dropDuplicates() method to easily remove duplicated rows or values from a DataFrame.

The dropDuplicates() method is a very powerful operation that returns a new DataFrame with duplicated rows removed. If the dropDuplicates() method is given no parameters then it searches the DataFrame for complete rows that are exactly the same. Spark checks every cell in a row to ensure that both rows are exactly the same. Howerver, Spark does not provide options for removing the first or the last occurance of a duplicate row. If column names are supplied to the dropDuplicates() method, then Spark only searches in those columns for exact maches of duplicate rows. But take caution before using the dropDuplicates() method. In our examples, the DataFrame only has ten rows and we can clearly see that duplicated rows are acceptable for remove. If for example, our DataFrame had a billion or more rows, it is much more difficult to determine whether duplicated records are acceptable for the given dataset. 

Now lets look at an example with the DataFrame of names and addreses.

The following PySpark code searches all the rows of the DataFrame and removes any row that is an exact copy of another row. The resulting DataFrame is saved using a different variable name and the results are displayed. 

removed_dups = df.dropDuplicates()
removed_dups.show()

The following Spark Scala code removes duplicated rows from the entire DataFrame and saves the resulting DataFrame under a different variable name and displays the results.

val removed_dups = df.dropDuplicates()
removed_dups.show()

The output below is the resulting DataFrame from both the PySpark and Scala examples. 

+----------+---------+--------------------+--------------+
|first_name|last_name|             address|  phone_number|
+----------+---------+--------------------+--------------+
|    Sheena|     null|399 Pierce Street...|          null|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
|      Gary|    Cliff|                    |              |
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396|
|   Jasmine|   French|76 S. Lafayette A...|(385) 456-9823|
|     Lacie|Culbreath|                null|(782) 339-8539|
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288|
| Stephanie|    Smith|500 East Main St....|(897) 455-1312|
|       Joe|    Vigna|3 Armstrong Stree...|          null|
+----------+---------+--------------------+--------------+

In this example there was one row exactly duplicated and that was the “Daniel Towles” row. The dropDuplicates() method performed exactly as expected. In one simple operation all the duplicated rows  are removed from any DataFrame. 

Another thing to notice is the order of rows of the resulting DataFrame. Did you notice that the values are not in the same order? That is because the dropDuplicates() method creates a new DataFrame in the background regardless of whether it is saved under a new variable name or not. Because the data in the new DataFrame is distributed, the order of the rows displayed to us isn’t always the same.

Removing Duplicate Rows with a Subset of Columns

Instead of searching for duplicates by matching data in every cell, the dropDuplicates() method can also be applied to a subset of DataFrame columns. This operation still removes the entire matching row(s) from the DataFrame but the number of columns it searches for duplicates in is reduced from all the columns to the subset of column(s) provided by the user.

In PySpark, to search for duplicate values by a subset of columns, the optional parameter takes a list of string column names. In Spark Scala, the dropDuplicates() method can receive either an array of string column names or just the column names themselves without using an array.

In the example DataFrame, let’s say we want to have only one unique first name and last name combination. We can limit the dropDuplicates() method to only search for identical entries in these two name columns. 

In PySpark, use a Python list of column names in the dropDuplicates() method to search for duplicates in. Here the list includes the "first_name" and "last_name" columns.

df.dropDuplicates(["first_name", "last_name"]).show()

In Spark Scala there are two options. In the first option below, we use an array to delineate the column names and in the second example we pass comma separated column strings. 

df.dropDuplicates(("first_name", "last_name")).show()

df.dropDuplicates("first_name", "last_name").show()

Output:

+----------+---------+--------------------+--------------+
|first_name|last_name|             address|  phone_number|
+----------+---------+--------------------+--------------+
|    Sheena|     null|399 Pierce Street...|          null|
|     Lacie|Culbreath|                null|(782) 339-8539|
|   Jasmine|   French|76 S. Lafayette A...|(385) 456-9823|
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288|
|       Joe|    Vigna|3 Armstrong Stree...|          null|
|      Gary|    Cliff|                    |              |
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
+----------+---------+--------------------+--------------+

The output of all the commands above results in the same DataFrame. Because we limited the dropDuplicates() method to search by the two columns, two rows were removed. The “Daniel Towles” row was removed as before but also one of the “Stephanie Smith” rows. This was because there were two “Stephanie Smith” rows but with different addresses and phone numbers. Be very careful when dropping duplicates with a subset of columns because Spark might remove the wrong row. Based upon the information we gave Spark (the “first_name” and “last_name” columns) it did exactly what we asked it to. If a real-life example was like this, we don’t know if it was a mistake for two “Stephanie Smith” rows. It could be an error or maybe there are two Stephanie Smith’s and that is perfectly acceptable.

Dropping Null Values

After learning how to drop duplicated rows from a DataFrame the next thing is to remove null values from a DataFrame. But what is a null value? In Spark a null value is a specific data type called NullType and literally means “None”. A null value is used to represent data where nothing useful is known about it. It is different than blank data or even the space character. The null value exists because computer systems need to return something when errors occur or when values become unassigned. It is a safeguard that keeps the data orderly and outcomes predictable. 

The output of null in both Spark Scala and PySpark is the value null. It is not a string with quotes but its own data type NullType. To manually specify a null value in Spark Scala use the characters null without quotes because it is not a string. But in PySpark it is specified differently by the characters None without quotes. You may have not noticed it in the beginning of this section where we created our dirty DataFrame, the PySpark example used None while the Spark Scala example used null. But in both cases when we created the DataFrame the value in the DataFrame evaluated to null

Drop Na

To remove null values from a Spark DataFrame use the dropna() method in PySpark or the na.drop() method in Spark Scala. Both methods have the same functionality but in Scala the drop() method must first call the DataFrameNaFunctions class which can be accessed by calling na

In PySpark the dropna() method works by removing rows from a DataFrame that contain null values. Each time the method is used a new DataFrame is constructed. Call the dropna() method on your DataFrame and either display the results or save the result as a new variable name. An example would look like:

df.dropna().show()

The dropna() method has three optional parameters but only a maximum of two parameters can be used together. In PySpark the parameters consist of howthresh, and subset. The parameters how and thresh will never be used together. The parameter how is a string parameter and only takes two string options. If "any", the method removes the row if any (or at least one) of the cells contains a null. If the parameter is set to "all", the method removes the row only if all of the values are null. The second parameter thresh means “threshold” and is an integer parameter. The dropna() method will remove a row only if the count of the number of non-null values is less than the number specified in the thresh parameter. Stated another way, there needs to be at least the threshold of non-null values or the row will be removed. The last parameter is subset and it works with both the how and the thresh parameters. The subset parameter is a list of columns that reduces the number of columns evaluated from every column in the DataFrame down to only the subset supplied in the list. 

In Spark Scala the na.drop() method works the same way as the dropna() method in PySpark, but the parameter names are different. In Scala they are howminNonNullscols instead of PySpark’s howthresh, and subset.

Exercise 6: Dropping Nulls

In this exercise we will remove rows with null values from a DataFrame using the dropna() method in PySpark and the na.drop() method in Spark Scala.

  1. To drop any row that contains any null value, call the dropna() method on your DataFrame. If no parameters are supplied then it defaults to how = "any" which removes any row that contains a null value. The following commands are all equivalent and produce the same resulting DataFrame:

PySpark:

df.dropna().show()
df.dropna(how = "any").show()

Spark Scala:

df.na.drop().show()
df.na.drop(how = "any").show()

Output:

+----------+---------+--------------------+--------------+
|first_name|last_name|             address|  phone_number|
+----------+---------+--------------------+--------------+
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288|
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
| Stephanie|    Smith|500 East Main St....|(897) 455-1312|
|      Gary|    Cliff|                    |              |
|   Jasmine|   French|76 S. Lafayette A...|(385) 456-9823|
+----------+---------+--------------------+--------------+
  1. To drop rows that have null values in every cell use the parameter how = "all" as shown in the following code:

PySpark:

df.dropna(how = "all").show()

Spark Scala:

df.na.drop(how = "all").show()

Output: Removes no rows because there aren’t any rows that contain all null values

+----------+---------+--------------------+--------------+
|first_name|last_name|             address|  phone_number|
+----------+---------+--------------------+--------------+
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288|
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396|
|     Lacie|Culbreath|                null|(782) 339-8539|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
|    Sheena|     null|399 Pierce Street...|          null|
|       Joe|    Vigna|3 Armstrong Stree...|          null|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
| Stephanie|    Smith|500 East Main St....|(897) 455-1312|
|      Gary|    Cliff|                    |              |
|   Jasmine|   French|76 S. Lafayette A...|(385) 456-9823|
+----------+---------+--------------------+--------------+
  1. Use either the subset parameter in PySpark or the cols parameter in Scala Spark in conjunction with the how parameter to limit the search for null values to specific columns. The following PySpark code removes rows with any nulls in the "first_name" or "last_name" columns:
df.dropna(how = "any", subset = ["first_name", "last_name"]).show()
+----------+---------+--------------------+--------------+
|first_name|last_name|             address|  phone_number|
+----------+---------+--------------------+--------------+
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288|
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396|
|     Lacie|Culbreath|                null|(782) 339-8539|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
|       Joe|    Vigna|3 Armstrong Stree...|          null|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
| Stephanie|    Smith|500 East Main St....|(897) 455-1312|
|      Gary|    Cliff|                    |              |
|   Jasmine|   French|76 S. Lafayette A...|(385) 456-9823|
+----------+---------+--------------------+--------------+

The following Spark Scala code drops rows with with any nulls in the "address” or "phone_number" columns:

df.na.drop("any", Array("address", "phone_number")).show()

Output:

+----------+---------+--------------------+--------------+
|first_name|last_name|             address|  phone_number|
+----------+---------+--------------------+--------------+
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288|
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
| Stephanie|    Smith|500 East Main St....|(897) 455-1312|
|      Gary|    Cliff|                    |              |
|   Jasmine|   French|76 S. Lafayette A...|(385) 456-9823|
+----------+---------+--------------------+--------------+
  1. Use the parameter thresh=3 in PySpark and minNonNulls=3 in Spark Scala to guarantee that there will be at least three non-null values in each row.

PySpark:

df.dropna(thresh = 3).show()

Spark Scala:

df.na.drop(minNonNulls = 3).show()

Output:

+----------+---------+--------------------+--------------+
|first_name|last_name|             address|  phone_number|
+----------+---------+--------------------+--------------+
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288|
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396|
|     Lacie|Culbreath|                null|(782) 339-8539|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
|       Joe|    Vigna|3 Armstrong Stree...|          null|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
| Stephanie|    Smith|500 East Main St....|(897) 455-1312|
|      Gary|    Cliff|                    |              |
|   Jasmine|   French|76 S. Lafayette A...|(385) 456-9823|
+----------+---------+--------------------+--------------+

This exercise showed how effortless it is to remove null values from a DataFrame.

Filling Null Values

Above we learned how to remove entire rows with one or many null values. But maybe we don’t want to remove any rows. Spark has a method that allows the user to fill or replace null values with any value. The method is called fillna() in PySpark and na.fill() in Spark Scala. The method na.fill() works by searching all the columns or a subset of the columns for null values and replacing the null with a specified value. The method has one required parameter called value that can be a data type of integer, long, double, string, or boolean. The second optional parameter, called subset in PySpark or cols in Spark Scala, accepts a Python list or a Scala array of the columns to limit the search to.

It is important to note that the na.fill() method will only replace values in columns with the same data type as the one specified in the value parameter. For example, if we have all string columns in our DataFrame and we try to replace null values with the decimal 5.5 then no nulls will be replaced because all of the columns are string. This works for every other data type mismatch between column and replacement value. 

Exercise 7: Filling Null Values

  1. Use the PySpark method fillna() or the Spark Scala method na.fill() to replace all the null values in the DataFrame with the string “This was a null value!”: 

PySpark:

df.fillna("This was a null value!").show()

Spark Scala:

df.na.fill("This was a null value!").show()

Output:

+----------+--------------------+--------------------+--------------------+
|first_name|           last_name|             address|        phone_number|
+----------+--------------------+--------------------+--------------------+
| Stephanie|               Smith|777 Brockton Aven...|      (340) 977-9288|
|       Ike|               Dodge|8478 Mill Pond Rd...|      (545) 236-9396|
|     Lacie|           Culbreath|This was a null v...|      (782) 339-8539|
|    Daniel|              Towles|45 Water Street C...|      (699) 791-0320|
|    Sheena|This was a null v...|399 Pierce Street...|This was a null v...|
|       Joe|               Vigna|3 Armstrong Stree...|This was a null v...|
|    Daniel|              Towles|45 Water Street C...|      (699) 791-0320|
| Stephanie|               Smith|500 East Main St....|      (897) 455-1312|
|      Gary|               Cliff|                    |                    |
|   Jasmine|              French|76 S. Lafayette A...|      (385) 456-9823|
+----------+--------------------+--------------------+--------------------+
  1. We can limit the columns that are used to search for null values to specific column(s) by giving a Python list or a Scala array in the second optional parameter. In this example we want to replace every null value in the "phone_number" with a dummy phone number. The first parameters is the string dummy phone number to replace null values with and the second parameter is a Python list or a Scala array of column names to search in.

PySpark:

df.fillna("(555) 555-5555", ["phone_number"]).show()

Spark Scala:

df.na.fill("(555) 555-5555", Array("phone_number")).show()

Output:

+----------+---------+--------------------+--------------+
|first_name|last_name|             address|  phone_number|
+----------+---------+--------------------+--------------+
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288|
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396|
|     Lacie|Culbreath|                null|(782) 339-8539|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
|    Sheena|     null|399 Pierce Street...|(555) 555-5555|
|       Joe|    Vigna|3 Armstrong Stree...|(555) 555-5555|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
| Stephanie|    Smith|500 East Main St....|(897) 455-1312|
|      Gary|    Cliff|                    |              |
|   Jasmine|   French|76 S. Lafayette A...|(385) 456-9823|
+----------+---------+--------------------+--------------+
  1. To demonstrate that Spark only replaces values of the same data types as the columns, here is an example of attempting to replace null values with a decimal when all of the columns are string. The output DataFrame will have no changes compared to the original DataFrame.

PySpark:

df.fillna(5.5).show()

Spark Scala:

df.na.fill(5.5).show()

Output:

+----------+---------+--------------------+--------------+
|first_name|last_name|             address|  phone_number|
+----------+---------+--------------------+--------------+
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288|
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396|
|     Lacie|Culbreath|                null|(782) 339-8539|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
|    Sheena|     null|399 Pierce Street...|          null|
|       Joe|    Vigna|3 Armstrong Stree...|          null|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
| Stephanie|    Smith|500 East Main St....|(897) 455-1312|
|      Gary|    Cliff|                    |              |
|   Jasmine|   French|76 S. Lafayette A...|(385) 456-9823|
+----------+---------+--------------------+--------------+

As expected, we can see that the output has no changes because we tried replacing string values with a decimal value.

Replacing Values

In Spark we are not limited to only working with null values when trying to clean DataFrames. Spark has a replace() method in PySpark or a na.replace() method in Spark Scala that will replace any value in a DataFrame with another value. This is a very powerful method that allows the user pinpoint precision when replacing values. However, the methods in PySpark and Spark Scala are very different. The PySpark method replace() is very easy to use and understand. Unfortunately, the Spark Scala method na.replace() is cumbersome and not straight forward. But let’s start with the PySpark method replace().

The PySpark method replace() has two required parameters to_replace and value. Very simply to_replace is the value you would like to replace in the DataFrame and value is the value you would like to replace it with. The allowed data types for the to_replace parameter include boolean, integer, long, float, string, and list. The data types for value are the same as to_replace with the additional data type of None. Which means we can actually replace values in a DataFrame with nulls. The optional parameter subset limits the searching to a list of column names. 

Exercise 8: Replacing Values in PySpark

In this exercise we will replace one value in a DataFrame with another value using PySpark.

Imagine our DataFrame represented company employee data. On occasion, people change their names. The Spark replace method is a great way to update data in a DataFrame with precision. In this example we will update the “last_name” value for the row of “Jasmine French” to “Davidson”.

  1. Call the replace() method on our DataFrame with the first parameter "French“, the value we would like to replace, and the second parameter "Davidson", the value we would like to replace it with. Since we know we are only searching within the “last_name” column, set the third parameter a Python list of the string "last_name".
df.replace("French", "Davidson", ["last_name"]).show()
+----------+---------+--------------------+--------------+
|first_name|last_name|             address|  phone_number|
+----------+---------+--------------------+--------------+
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288|
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396|
|     Lacie|Culbreath|                null|(782) 339-8539|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
|    Sheena|     null|399 Pierce Street...|          null|
|       Joe|    Vigna|3 Armstrong Stree...|          null|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
| Stephanie|    Smith|500 East Main St....|(897) 455-1312|
|      Gary|    Cliff|                    |              |
|   Jasmine| Davidson|76 S. Lafayette A...|(385) 456-9823|
+----------+---------+--------------------+--------------+
  1. In addition to data types like strings and numbers we can also search for blank values. If there were many blank values in our employee DataFrame we could search for "" and replace it with another more useful string. In this example set the to_replace value to "" and replace it with "Blank value". Search over every column.
df.replace("", "Blank value").show()
+----------+---------+--------------------+--------------+
|first_name|last_name|             address|  phone_number|
+----------+---------+--------------------+--------------+
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288|
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396|
|     Lacie|Culbreath|                null|(782) 339-8539|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
|    Sheena|     null|399 Pierce Street...|          null|
|       Joe|    Vigna|3 Armstrong Stree...|          null|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320|
| Stephanie|    Smith|500 East Main St....|(897) 455-1312|
|      Gary|    Cliff|         Blank value|   Blank value|
|   Jasmine|   French|76 S. Lafayette A...|(385) 456-9823|
+----------+---------+--------------------+--------------+

The replace method is a great way to clean up a DataFrame without having to worry unitented  consequences because you are searching for a very specific value.

Replacing Values in Spark Scala

In Spark Scala to replace values we must use a Scala Map(). A Scala Map() is a collection of key value pairs similar to a Python dictionary. Map() uses the -> to split the keys from the values. In the na.replace() method the first value is the column or columns to search in. The second parameter is a Map() with the key the value you are searching for and the value is the what you would like to replace it with. 

In this example we are searching for the last name “French” in the “last_name” column and replacing it with "Davidson".

df.na.replace("last_name", Map("French" -> "Davidson")).show()

In this example we are searching for blank values in two specific columns, “address” and “phone_number“. We search multiple columns by giving a Scala array in the first parameter.

df.na.replace(Array("address", "phone_number"), Map("" -> "Blank value")).show()

We can search all the columns by giving the "*" in the first parameter.

df.na.replace("*", Map("" -> "Blank value")).show()

Replacing nulls with Mean of Each Column

We have covered how to find null values and replace them with the na.fill() method and how to replace any value with another value with the replace() method. Another common technique for number columns is to substitute any null values with the average value of each column. There is no built-in Spark method to achieve this but we can combine what we have covered so far with a few new techniques to achieve replacing nulls with the average of each column. Let’s start with PySpark. 

In Python there is a very beneficial method called enumerate() the can be used inside a for loop. A for loop is used to loop over elements in a range or list. For example, the Python code below loops over every number from zero to two and performs an operation; which in this case prints the variable i to the console. 

for i in range(0,3):
  print(i)

Output:

0
1
2

The enumerate() method is given a list of elements and can keep track of the elements in the list as well as its index. The index is a number representing its location in the list. Remember, that Python lists are zero-based, which means they start counting at zero instead of one. The Python code below loops over elements in a list and prints the element value and the index number to the console.

for index, element in enumerate(["sales", "customer", "product"]):
  print(element + " - " + str(index))

Output:

sales - 0
customer - 1
product - 2

Before we can use enumerate() lets add two columns “age” and “salary” to our simple employee DataFrame. The code below adds two new entries to each list and creates a DataFrame.

example_data_with_age = [
  ["Stephanie", "Smith", "777 Brockton Avenue Abington, MA 2351", "(340) 977-9288", 33, 35000]
  , ["Ike", "Dodge", "8478 Mill Pond Rd. Desoto, TX 75115", "(545) 236-9396", 35, 37000]
  , ["Lacie", "Culbreath", None, "(782) 339-8539", 47, 51000]
  , ["Daniel", "Towles", "45 Water Street Cartersville, GA 30120", "(699) 791-0320", 59, 60000]
  , ["Sheena", None, "399 Pierce Street La Vergne, TN 37086", None, 41, 48000]
  , ["Joe", "Vigna", "3 Armstrong Street Malvern, PA 19355",None, 48, 52000]
  , ["Daniel", "Towles", "45 Water Street Cartersville, GA 30120", "(699) 791-0320", 36, 37000]
  , ["Stephanie", "Smith", "500 East Main St. Carlsbad, NM 54986", "(897) 455-1312", 46, 50000]
  , ["Gary", "Cliff", "", "", None, None]
  , ["Jasmine", "French", "76 S. Lafayette Ave. Cupertino, CA 95014", "(385) 456-9823", 31, 32000]
]

employee_df = spark.createDataFrame(example_data_with_age, ["first_name", "last_name", "address", "phone_number", "age", "salary"])
employee_df.show(20, False)

Output:

+----------+---------+----------------------------------------+--------------+----+------+
|first_name|last_name|address                                 |phone_number  |age |salary|
+----------+---------+----------------------------------------+--------------+----+------+
|Stephanie |Smith    |777 Brockton Avenue Abington, MA 2351   |(340) 977-9288|33  |35000 |
|Ike       |Dodge    |8478 Mill Pond Rd. Desoto, TX 75115     |(545) 236-9396|35  |37000 |
|Lacie     |Culbreath|null                                    |(782) 339-8539|47  |51000 |
|Daniel    |Towles   |45 Water Street Cartersville, GA 30120  |(699) 791-0320|59  |60000 |
|Sheena    |null     |399 Pierce Street La Vergne, TN 37086   |null          |41  |48000 |
|Joe       |Vigna    |3 Armstrong Street Malvern, PA 19355    |null          |48  |52000 |
|Daniel    |Towles   |45 Water Street Cartersville, GA 30120  |(699) 791-0320|36  |37000 |
|Stephanie |Smith    |500 East Main St. Carlsbad, NM 54986    |(897) 455-1312|46  |50000 |
|Gary      |Cliff    |                                        |              |null|null  |
|Jasmine   |French   |76 S. Lafayette Ave. Cupertino, CA 95014|(385) 456-9823|31  |32000 |
+----------+---------+----------------------------------------+--------------+----+------+

Recall from section 1.6: DataFrame Aggregations,  that we can use the groupBy() and the avg() methods to return the averages of each integer-like column of a DataFrame. Also, recall that to extract the values of a DataFrame we use the collect() method to return the values in a list of Rows. An example would look like:

employee_df.groupBy().avg().collect()

Output:

[Row(avg(age)=41.77777777777778, avg(salary)=44666.666666666664)]

We can then select the items in the list by giving its index number. Each Row object can be selected just like a list. So essentially, this operates like a nested list. So to select the first “age” column value:

employee_df.groupBy().avg().collect()[0][0]

Output:

41.77777777777778

Lastly, we will use the na.fill() method to find nulls and replace them. The key is the optional second parameter that takes a list of columns to search in. We can provide a list of column names of only the number columns we would like to find nulls in and replace with the average of each column.

Finally, we can combine the for loop with the enumerate() method plus the average groupBy plus na.fill() method to return a DataFrame with nulls replaced with the average of each column. Let’s put it all together.

The following code uses a list of the number columns to search in ["age", "salary"], and uses the Python enumerate() method to get the index and value for each element in the list. We also create a new DataFrame variable name updated_df that will be used throughout the operation. For each number column in the list we update the udpated_df DataFrame by specifying one column at a time in the na.fill() method. If there were more number columns to search nulls in, the only change would be to add the new column(s) to the list. The final code looks like:

updated_df = employee_df

for index, element in enumerate(["age", "salary"]):
  updated_df = updated_df.fillna(updated_df.groupBy().avg().collect()[0][index], [element])

updated_df.show()

Output:

+----------+---------+--------------------+--------------+---+------+
|first_name|last_name|             address|  phone_number|age|salary|
+----------+---------+--------------------+--------------+---+------+
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288| 33| 35000|
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396| 35| 37000|
|     Lacie|Culbreath|                null|(782) 339-8539| 47| 51000|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320| 59| 60000|
|    Sheena|     null|399 Pierce Street...|          null| 41| 48000|
|       Joe|    Vigna|3 Armstrong Stree...|          null| 48| 52000|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320| 36| 37000|
| Stephanie|    Smith|500 East Main St....|(897) 455-1312| 46| 50000|
|      Gary|    Cliff|                    |              | 41| 44666|
|   Jasmine|   French|76 S. Lafayette A...|(385) 456-9823| 31| 32000|
+----------+---------+--------------------+--------------+---+------+

In Spark Scala the operation uses many of the same pieces, but unfortunately we cannot use a loop to update the DataFrame in one operation. Python allows DataFrames to be changed and then saved back to the same variable name (In PySpark, DataFrames are still immutable, because in the background the new DataFrame is created when an operation is performed, but afterwards the new DataFrame is given the same variable name). However, Scala does not have that capability and each change in the DataFrame must be accompanied by a different and unique DataFrame variable name. This prevents looping over the numeric columns in a DataFrame and replacing nulls with the average of each column like we did for PySpark. 

In Spark Scala, the solution is to replace nulls of each numeric column one column at a time with each column corresponding to a new and different DataFrame variable name. We start off with creating an array of the number columns in our DataFrame that we would like to update. In our case there are two:

val number_columns = Array("age", "salary")

We will use this array with number indexes to make this manual process a little easier. The operation in Scala will look very similar to the operation we used in PySpark. We call the employee_df DataFrame and fill null values with na.fill() and inside we groupBy() the avg() of the entire column. We use (0)(0) to select the first average value (which corresponds to the first column in our list) returned from the groupBy(). Then we cast the average values as a decimal with .asInstanceOf[Double] and then limit the na.fill() to only the “age” column because it is the first numeric column in our array. The first DataFrame operation would look like:

val df1 = employee_df.na.fill(employee_df.groupBy().avg().collect()(0)(0).asInstanceOf[Double], Array(number_columns(0)))

At this point if we displayed the df1 DataFrame all of the nulls in the “age” column would be replaced with the average of the entire column. To perform the same operation on the “salary” column we use a new DataFrame variable name, df2, and call the operation on the DataFrame df1. We want to use df1 so we can keep the update we made to the “age” column. After that the only difference is that we choose the second result of the avg() groupBy() with (0)(1) and choose the second element in the array. The next line of code would look like:

val df2 = df1.na.fill(df1.groupBy().avg().collect()(0)(1).asInstanceOf[Double], Array(number_columns(1)))

We could continue this technique of adding new DataFrames for as many numeric columns as needed. It is a manual process but it gets the job done. The final code and output would be:

val number_columns = Array("age", "salary")

val df1 = employee_df.na.fill(employee_df.groupBy().avg().collect()(0)(0).asInstanceOf[Double], Array(number_columns(0)))
val df2 = df1.na.fill(df1.groupBy().avg().collect()(0)(1).asInstanceOf[Double], Array(number_columns(1)))

df2.show()
+----------+---------+--------------------+--------------+---+------+
|first_name|last_name|             address|  phone_number|age|salary|
+----------+---------+--------------------+--------------+---+------+
| Stephanie|    Smith|777 Brockton Aven...|(340) 977-9288| 33| 35000|
|       Ike|    Dodge|8478 Mill Pond Rd...|(545) 236-9396| 35| 37000|
|     Lacie|Culbreath|                null|(782) 339-8539| 47| 51000|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320| 59| 60000|
|    Sheena|     null|399 Pierce Street...|          null| 41| 48000|
|       Joe|    Vigna|3 Armstrong Stree...|          null| 48| 52000|
|    Daniel|   Towles|45 Water Street C...|(699) 791-0320| 36| 37000|
| Stephanie|    Smith|500 East Main St....|(897) 455-1312| 46| 50000|
|      Gary|    Cliff|                    |              | 41| 44666|
|   Jasmine|   French|76 S. Lafayette A...|(385) 456-9823| 31| 32000|
+----------+---------+--------------------+--------------+---+------+

The Consequences of “fixing” Dirty Data

All real-world data is dirty. This chapter has shown several built-in Spark techniques for dealing with dirty data. Removing duplicates, dropping null values, filling null values, and replacing values are some of the many techniques that can be deployed to try to remedy dirty data. However, even if data is dirty that doesn’t mean it is unimportant or unusable. All data is information. By removing, replacing, or dropping data you are removing, replacing, and dropping information. That information, however dirty, is meaningful and should not be discarded or changed without careful consideration. The issue is not how much data to clean but how much information to keep in its original form.

All data is information. By removing, replacing, or dropping data you are removing, replacing, and dropping information.

Before attempting any data cleaning endeavors always answer the following two questions:

  • For this particular data set, what is my definition of dirty data?
  • Once dirty data has been found, for the particular data set, what should I do?

Unfortunately, no definite answers can be given. All data cleaning operations require detailed business understanding on the particular data set, the subject matter of the data set, source of the data, how the data set was constructed, data type significance of each column, how the data is organized and formatted, what do duplicated records or rows mean, what do null values mean, and what happens to the data set when data is replaced or fixed. All of these are serious considerations that must be thought about before altering data. 

For question one, “what is my definition of dirty data?”, data is dirty when it does not add value to the data set or its value is diminished. For a static data set, dirty data might be an entire row of nulls. Or if the same row is repeated one-hundred times. But again, it depends on the context of what the data is and how it was generated. For time-series data that has entries by time, repeated rows may be completely acceptable and add value.

Data is dirty when it does not add value to the data set

For question two, “what to do with dirty data when it is found?”, depends on what you think you can reasonably justify removing or changing. Remember, the ultimate goal of any data set is to use it to help make decisions, either through data analysis or machine learning operations. When we alter a dataset we are impacting its information. If at your job, you are tasked by your management with taking a data set or sets and performing data analysis and providing them recommendations on that data, cleaning data has serious implications. You could remove too much or change not enough. The final conclusions of your data analysis could be dramatically implicated by the data you did or didn’t clean. A good rule to follow is, don’t alter data unless you can reasonably justify why you are changing or removing the data to others.


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.