Introduction

In the last chapter, the reader was introduced to Spark DataFrames. The reader was shown Spark commands for creating, processing, and working with DataFrames. In this chapter, we will accelerate into more complex and challenging topics like data cleaning, statistics, and ML Pipelines. In the Chapter 1DataFrames with Spark, we talked about the “what” of Spark DataFrames, which would be the commands to do different things. Here, we will introduce the “why” and the “how” of Spark DataFrames, which would be starting to ask, “why are we doing this” and thinking longer term on how the steps we do at the beginning of a data problem impact us at the end. 

This chapter will shift focus from only learning individual commands to how the reader accomplishes a larger goal. We will cover topics that the reader will experience in production systems like writing files in partitions, machine learning pipelines, and the advantages of using Datasets. These skills are important because these topics are used in production systems to solve real-world problems. For example, companies write data into partitions to segment constantly updated data so it is easier to query and manage. Additionally, machine learning processes aren’t applied to to random data. The data needs to be cleaned and then transformed into formats that the machine learning algorithm can use.

DataFrame Data Analysis

In the previous chapter we introduced how to create Spark DataFrames and many of the operations that can be performed on DataFrames. But we introduced all the operations and techniques in isolation of each other. In this section we are going to start with a data set, then logically explain how to process that data into something more useful and manageable, while solidifying the concepts from the previous chapter. Then in the next section we will use that cleaned data set to introduce data partitioning, an advanced concept that is used in production systems in many companies.

In this section we are going to start with a data set then logically explain how to process that data into something more useful

Getting Data

The data set we are going to use for the first two sections of this chapter comes from the United States Census Bureau. The United States Census Bureau is a principal agency of the U.S. Federal Statistical System, responsible for producing data about the American people and economy. For every ten years the Census Bureau takes a census to determine the population makeup of the country. The Modified Race Data 2010 data set includes the number of people for every age range, race, and sex group of every state and county in the United States for the year 2010. This data set is great introductory data set because it needs processing to transform it into a more user friendly state and will allow us to practice concepts from the last chapter and teach more advanced concepts. The Modified Race Data 2010 page has a couple of PDFs that explain how the data set was created and the makeup of the columns.

Visit the Modified Race Data 2010 website. At the bottom of the page click the link “Additional file formats are available on the FTP archive” which takes you to a page that contains CSV versions of the data. Download the two CSV files stco-mr2010_al_mo.csv & stco-mr2010_mt_wy.csv to a local folder on your computer. The data set is sorted alphabetically by state. The file name containing “al_mo.csv” contains states Alabama through Missouri and the other file containing “mt_wy.csv” contains states Montana through Wyoming.

After downloading the CSV files locally to your compute, use the command line on Windows or the Terminal app on a Mac to upload the files to the edge node of your Hadoop cluster. Use the scp command to upload the files.

Mac and Linux example:

$ scp /Users/user_directory/Downloads/stco-mr2010_al_mo.csv user_name@cluster_ip_address:/home/your_user_name/stco-mr2010_al_mo.csv

$ scp /Users/user_directory/Downloads/stco-mr2010_mt_wy.csv user_name@cluster_ip_address:/home/your_user_name/stco-mr2010_mt_wy.csv

Windows example:

scp C:\Users\user_directory\Downloads\stco-mr2010_al_mo.csv user_name@cluster_ip_address:/home/your_user_name/stco-mr2010_al_mo.csv

scp C:\Users\user_directory\Downloads\stco-mr2010_mt_wy.csv user_name@cluster_ip_address:/home/your_user_name/stco-mr2010_mt_wy.csv

Once on the Hadoop edge node, use the Hadoop command -put to upload the files from the edge node to a directory in HDFS. In this section, the CSV files were placed in a HDFS subdirectory called census_data_2010

$ hadoop fs -put /home/your_user_name/stco-mr2010_al_mo.csv /user/some_dir/census_data_2010/stco-mr2010_al_mo.csv

$ hadoop fs -put /home/your_user_name/stco-mr2010_mt_wy.csv /user/some_dir/census_data_2010/stco-mr2010_mt_wy.csv

The files are now ready to be read by Spark. Now lets get to coding.


Exercise 1: Creating a DataFrame

  1. Create a DataFrame called “df” from the two CSV files “stco-mr2010_mt_wy.csv” and “stco-mr2010_mt_wy.csv“. Since both file names are similar, we can use wildcards (which we introduced in Chapter 1) to read both files into the DataFrame at once. We wrap the entire PySpark command in paranthesis to execute a multiple line command. Additionally, we set the separator to “,“, infer the schema from the CSV file, and notify Spark of the presents of a header row. See the complete PySpark example below: 
df = (spark.read
  .format("csv")
  .load("hdfs://…/census_data_2010/stco-mr2010_*.csv"
    , sep=","
    , inferSchema="true"
    , header="true")
)

The complete Spark Scala command looks like:

val df = spark.read
  .option("sep", ",")
  .option("inferSchema", "true")
  .option("header", "true")
  .csv("hdfs://…/census_data_2010/stco-mr2010_*.csv")
  1. To display the schema of the DataFrame use the printSchema() method. In PySpark and Spark Scala it would look like:
df.printSchema()
root
 |-- SUMLEV: integer (nullable = true)
 |-- STATE: integer (nullable = true)
 |-- COUNTY: integer (nullable = true)
 |-- STNAME: string (nullable = true)
 |-- CTYNAME: string (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- ORIGIN: integer (nullable = true)
 |-- AGEGRP: integer (nullable = true)
 |-- IMPRACE: integer (nullable = true)
 |-- RESPOP: integer (nullable = true)

From the tree output of the printSchema() method, we see there are ten columns. All of the columns are integers except, “STNAME” & “CTYNAME” which are strings. 

  1. To output the first twenty rows of the DataFrame use the show() method in the PySpark and Spark Scala code below:
df.show()
+------+-----+------+-------+--------------+---+------+------+-------+------+
|SUMLEV|STATE|COUNTY| STNAME|       CTYNAME|SEX|ORIGIN|AGEGRP|IMPRACE|RESPOP|
+------+-----+------+-------+--------------+---+------+------+-------+------+
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      1|  1337|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      2|   356|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      3|     2|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      4|    13|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      6|    33|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      7|    11|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      8|    17|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     10|     4|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     11|     1|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     16|     1|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     17|     1|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     19|     1|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     21|     1|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     2|      1|  1460|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     2|      2|   398|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     2|      3|    12|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     2|      4|    17|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     2|      6|    21|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     2|      7|     9|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     2|      8|    10|
+------+-----+------+-------+--------------+---+------+------+-------+------+
only showing top 20 rows

The schema of the DataFrame shows that many of the columns are integers. After reading the Modified Race Data 2010 Document, we see that most of the columns of type integer are categorical columns. The column “SEX“, “ORIGIN“, “AGEGRP“, and “IMPRACE” are numbers that correspond to text categories. For example, the “SEX” column has two categories, 1 representing “Male” & 2 for “Female”. The column “ORIGIN” means Hispanic origin and 1 means “Not Hispanic” while 2 is for “Hispanic”. The column “AGEGRP” is the age group of the population. This column has eighteen different age range groups with each text group similar to “Age 25 to 29 years”. The “IMPRACE” column is the imputed race group from the census and has thirty-one different race combinations. The “REPOP” is not a categorical column but is the only continuous column. “REPOP” represents the number of people in the combined groups of state, county, sex, origin, age group, and race. We can count the rows of various columns using the distinct() method like the examples below:

PySpark and Spark Scala code:

df.select("AGEGRP").distinct().count()

Output

18

PySpark code:

df.select(df["IMPRACE"]).distinct().count()
Out[1]: 31

Spark Scala code:

import spark.implicits._

df.select($"IMPRACE").distinct.count()
res1: Long = 31

We can count the number of rows in the DataFrame and find there are 1,260,772 rows. 

PySpark and Spark Scala code:

df.count()
1260772

The integer values of the categorical columns of the DataFrame “df” are difficult to understand and use. Instead, we will create calculated columns using conditionals to convert the categorical numbers to their human-readable text values derived from the Modified Race Data 2010 Document. We will use the withColumn() method to create new calculated columns using the when() function to decide between different text outcomes based upon a condition.

In programming languages a conditional is an if-then statement were a Boolean operation or expression is evaluated and returns one of multiple possible results. In a Spark DataFrame, a conditional is evaluated with the when() function. The when() function has two parameters, the first is the condition and second is the value. The condition is the operation on a column to be evaluated. And the value is the result to be returned when the conditional evaluates to true.

An optional method otherwise() can be chained to the end of the when() function that specifies the output when the when() condition evaluates to false. otherwise() has only one parameter and that is the false outcome. If the otherwise() method isn’t used, any conditions that evaluate to false are given a value of null. 

Exercise 2: Processing the Sex and the Origin columns

In this exercise, we will walk through creating a conditional calculated column with one condition option.

  1. This first example we will use dummy data so you can clearly see how Spark conditionals operate. First create a DataFrame called “people_df” and then call the withColumn() method to create a new column named “new_column_name” in the first parameter. In the second paramter use the when() function evaluate the condition people_df["Sex"] == 1 that outputs “Male” when the condition is true. We will not specify an outcome when the condition evaluates to false, so the outcome will be null. Lastly disply the contents of the new DataFrame. The PySpark code is:
from pyspark.sql import functions as F

people = (("Alice", 2), ("Jim", 1))

people_df = spark.createDataFrame(people, ["Name", "Sex"])

new_people_df.withColumn("new_column_name", F.when(people_df["Sex"] == 1, "Male"))

new_people_df.show()

Spark Scala code:

import sqlContext.implicits._ // for `toDF`
import org.apache.spark.sql.functions._ // for `when`

val people = List(("Alice", 2), ("Jim", 1))

val people_df = spark.createDataFrame(people).toDF("Name", "Sex")

val new_people_df = people_df.withColumn("new_column_name", when(people_df("Sex") === 1, "Male"))

new_people_df.show()

Output:

+-----+---+---------------+
| Name|Sex|new_column_name|
+-----+---+---------------+
|Alice|  2|           null|
|  Jim|  1|           Male|
+-----+---+---------------+
  1. In the next example we will use Census DataFrame “df” to convert the “SEX” column of ones and twos to their string equivalents using the when() function along with the otherwise() method. Create a new column named “sex_mf” that returns “Male” when the column “SEX” is equal to 1 and returns “Female” when the column “SEX” is not equal to 1. Display the two ten rows.

PySpark code:

from pyspark.sql import functions as F

df.withColumn("sex_mf", F.when(df['SEX'] == 1, "Male").otherwise("Female")).show(10)

Spark Scala code:

import sqlContext.implicits._ // for `toDF` and $""
import org.apache.spark.sql.functions._ // for `when`

df.withColumn("sex_mf", when($"SEX" === 1, "Male").otherwise("Female")).show(10)

Output

+------+-----+------+-------+--------------+---+------+------+-------+------+------+
|SUMLEV|STATE|COUNTY| STNAME|       CTYNAME|SEX|ORIGIN|AGEGRP|IMPRACE|RESPOP|sex_mf|
+------+-----+------+-------+--------------+---+------+------+-------+------+------+
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      1|  1337|  Male|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      2|   356|  Male|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      3|     2|  Male|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      4|    13|  Male|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      6|    33|  Male|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      7|    11|  Male|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      8|    17|  Male|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     10|     4|  Male|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     11|     1|  Male|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     16|     1|  Male|
+------+-----+------+-------+--------------+---+------+------+-------+------+------+
only showing top 10 rows

The new column “sex_mf” displays on the right side of the DataFrame with the correct “Male” and “Female” text depending on the numerical value of column “SEX“. To view the first “Female” row, chain the filter() method after the withColumn() method to see the rows displaying females: for PySpark chain .filter(F.col("sex_mf") == "Female") and for Spark Scala chain .filter(col("sex_mf") === "Female")

+------+-----+------+-------+--------------+---+------+------+-------+------+------+
|SUMLEV|STATE|COUNTY| STNAME|       CTYNAME|SEX|ORIGIN|AGEGRP|IMPRACE|RESPOP|sex_mf|
+------+-----+------+-------+--------------+---+------+------+-------+------+------+
|    50|    1|     1|Alabama|Autauga County|  2|     1|     1|      1|  1260|Female|
|    50|    1|     1|Alabama|Autauga County|  2|     1|     1|      2|   313|Female|
|    50|    1|     1|Alabama|Autauga County|  2|     1|     1|      3|     2|Female|
|    50|    1|     1|Alabama|Autauga County|  2|     1|     1|      4|    15|Female|
|    50|    1|     1|Alabama|Autauga County|  2|     1|     1|      6|    33|Female|
|    50|    1|     1|Alabama|Autauga County|  2|     1|     1|      7|     7|Female|
|    50|    1|     1|Alabama|Autauga County|  2|     1|     1|      8|    10|Female|
|    50|    1|     1|Alabama|Autauga County|  2|     1|     1|     10|     6|Female|
|    50|    1|     1|Alabama|Autauga County|  2|     1|     1|     11|     3|Female|
|    50|    1|     1|Alabama|Autauga County|  2|     1|     1|     16|     2|Female|
+------+-----+------+-------+--------------+---+------+------+-------+------+------+
only showing top 10 rows
  1. But we are not limited to adding only one calculated column. We can add many columns at the same time in both PySpark and Spark Scala. Below is an example of adding calculate columns based upon the “SEX” and “ORIGIN” columns at the same time. All that is required is to continue chaining more withColumn() methods to the original DataFrame to add multiple calculated columns at once. In this example create a column “hispanic_origin” from the “ORIGIN” column. When the column “ORIGIN” is 2 return “Hispanic” and otherwise return “Not Hispanic”. Display the results.

PySpark code:

from pyspark.sql import functions as F

df.withColumn("sex_mf", F.when(df['SEX'] == 1, "Male").otherwise("Female")) \
  .withColumn("hispanic_origin", F.when(df.ORIGIN == 2, "Hispanic").otherwise("Not Hispanic")) \
  .show(10)

Spark Scala code:

import sqlContext.implicits._ // for `toDF` and $""
import org.apache.spark.sql.functions._ // for `when`

df.withColumn("sex_mf", when($"SEX" === 1, "Male").otherwise("Female"))
  .withColumn("hispanic_origin", when(df("SEX") === 2, "Hispanic").otherwise("Not Hispanic"))
  .show(10)

Outcome

+------+-----+------+-------+--------------+---+------+------+-------+------+------+---------------+
|SUMLEV|STATE|COUNTY| STNAME|       CTYNAME|SEX|ORIGIN|AGEGRP|IMPRACE|RESPOP|sex_mf|hispanic_origin|
+------+-----+------+-------+--------------+---+------+------+-------+------+------+---------------+
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      1|  1337|  Male|   Not Hispanic|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      2|   356|  Male|   Not Hispanic|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      3|     2|  Male|   Not Hispanic|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      4|    13|  Male|   Not Hispanic|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      6|    33|  Male|   Not Hispanic|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      7|    11|  Male|   Not Hispanic|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      8|    17|  Male|   Not Hispanic|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     10|     4|  Male|   Not Hispanic|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     11|     1|  Male|   Not Hispanic|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     16|     1|  Male|   Not Hispanic|
+------+-----+------+-------+--------------+---+------+------+-------+------+------+---------------+
only showing top 10 rows

So far we have only covered creating conditional columns when the condition is binary or when there are only two choices. What happens when there are three or more options? Spark makes it very easy to accommodate multiple options. All that is need is to chain multiple when() functions together for as many options as you desire. Let’s look at an example using the “AGEGRP” column because it has 18 unique outcomes based upon the age brackets of the Census data.

Exercise 3: Multiple Conditionals

  1. In addition to the previous calculated columns “sex_mf” and “hispanic_origin“, add a new conditional calculated column named “age_group”. There will be nineteen different outcomes. The first eighteen results correspond to the eighteen different age brackets in the census. The last and nineteenth outcome will be the otherwise() method that catches any categories of the “AGEGRP” column that have a number outside the listed conditions. Inside the withColumn() method, chain together eighteen when() functions for each age category. The true values for each condition are the string age brackets outlined in the Modified Race Data 2010 Document.
    Tip: Using consistent tab spaces makes the code for multiple when() conditionals easy to read and understand.

PySpark code:

from pyspark.sql import functions as F

df.withColumn("sex_mc", F.when(df['SEX'] == 1, "Male").otherwise("Female")) \
  .withColumn("hispanic_origin", F.when(df.ORIGIN == 2, "Hispanic").otherwise("Not Hispanic")) \
  .withColumn("age_group", 
              F.when(df["AGEGRP"] == 1, "Age 0 to 4 years") 
              .when(df["AGEGRP"] == 2, "Age 5 to 9 years") 
              .when(df["AGEGRP"] == 3, "Age 10 to 14 years")
              .when(df["AGEGRP"] == 4, "Age 15 to 19 years")
              .when(df["AGEGRP"] == 5, "Age 20 to 24 years")
              .when(df["AGEGRP"] == 6, "Age 25 to 29 years")
              .when(df["AGEGRP"] == 7, "Age 30 to 34 years")
              .when(df["AGEGRP"] == 8, "Age 35 to 39 years")
              .when(df["AGEGRP"] == 9, "Age 40 to 44 years")
              .when(df["AGEGRP"] == 10, "Age 45 to 49 years")
              .when(df["AGEGRP"] == 11, "Age 50 to 54 years")
              .when(df["AGEGRP"] == 12, "Age 55 to 59 years")
              .when(df["AGEGRP"] == 13, "Age 60 to 64 years")
              .when(df["AGEGRP"] == 14, "Age 65 to 69 years")
              .when(df["AGEGRP"] == 15, "Age 70 to 74 years")
              .when(df["AGEGRP"] == 16, "Age 75 to 79 years")
              .when(df["AGEGRP"] == 17, "Age 80 to 84 years")
              .when(df["AGEGRP"] == 18, "Age 85 years or older")
              .otherwise("No age given")
             ) \
  .show(10)

Spark Scala code:

import org.apache.spark.sql.functions._ // for `when`

df.withColumn("sex_mf", when($"SEX" === 1, "Male").otherwise("Female"))
  .withColumn("hispanic_origin", when(df("SEX") === 2, "Hispanic").otherwise("Not Hispanic"))
  .withColumn("age_group",
              when(df("AGEGRP") === 1, "Age 0 to 4 years") 
              .when(df("AGEGRP") === 2, "Age 5 to 9 years") 
              .when(df("AGEGRP") === 3, "Age 10 to 14 years")
              .when(df("AGEGRP") === 4, "Age 15 to 19 years")
              .when(df("AGEGRP") === 5, "Age 20 to 24 years")
              .when(df("AGEGRP") === 6, "Age 25 to 29 years")
              .when(df("AGEGRP") === 7, "Age 30 to 34 years")
              .when(df("AGEGRP") === 8, "Age 35 to 39 years")
              .when(df("AGEGRP") === 9, "Age 40 to 44 years")
              .when(df("AGEGRP") === 10, "Age 45 to 49 years")
              .when(df("AGEGRP") === 11, "Age 50 to 54 years")
              .when(df("AGEGRP") === 12, "Age 55 to 59 years")
              .when(df("AGEGRP") === 13, "Age 60 to 64 years")
              .when(df("AGEGRP") === 14, "Age 65 to 69 years")
              .when(df("AGEGRP") === 15, "Age 70 to 74 years")
              .when(df("AGEGRP") === 16, "Age 75 to 79 years")
              .when(df("AGEGRP") === 17, "Age 80 to 84 years")
              .when(df("AGEGRP") === 18, "Age 85 years or older")
              .otherwise("No age given")
             )
  .show(10)

Outcome

+------+-----+------+-------+--------------+---+------+------+-------+------+------+---------------+----------------+
|SUMLEV|STATE|COUNTY| STNAME|       CTYNAME|SEX|ORIGIN|AGEGRP|IMPRACE|RESPOP|sex_mc|hispanic_origin|       age_group|
+------+-----+------+-------+--------------+---+------+------+-------+------+------+---------------+----------------+
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      1|  1337|  Male|   Not Hispanic|Age 0 to 4 years|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      2|   356|  Male|   Not Hispanic|Age 0 to 4 years|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      3|     2|  Male|   Not Hispanic|Age 0 to 4 years|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      4|    13|  Male|   Not Hispanic|Age 0 to 4 years|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      6|    33|  Male|   Not Hispanic|Age 0 to 4 years|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      7|    11|  Male|   Not Hispanic|Age 0 to 4 years|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|      8|    17|  Male|   Not Hispanic|Age 0 to 4 years|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     10|     4|  Male|   Not Hispanic|Age 0 to 4 years|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     11|     1|  Male|   Not Hispanic|Age 0 to 4 years|
|    50|    1|     1|Alabama|Autauga County|  1|     1|     1|     16|     1|  Male|   Not Hispanic|Age 0 to 4 years|
+------+-----+------+-------+--------------+---+------+------+-------+------+------+---------------+----------------+
only showing top 10 rows

In the previous exercises we displayed the results of the new conditional calculated columns with the show() method. In this exercise below we will save the new columns to a new DataFrame named “census_df”. This will be the final processing of the orignal DataFrame. This exercise is a great showcase of applying many different operations at once, on the same orginal DataFrame, to produce one result. We will add our previous conditonal columns, then add a new conditional calculate column, then rename several columns, and lastly drop unneeded columns. The final “census_df” DataFrame will have the following columns, in this order: “us_state”, “county_name”, “sex_mf”, “hispanic_origin”, “age_group”, “race”, and “population”.

Exercise 4: Putting it all together

In this exercise we will put everything together we have learned so far to produce a processed DataFrame in one operation.

Note:

The code in the steps in this exercise are snippets that all go together. The final complete code command will be provided after this exercise.

  1. Create a new DataFrame named “census_df” from the original “df” DataFrame. Create the “sex_mf“, “hispanic_origin“, and “age_group” columns as outlined above. 

PySpark code:

from pyspark.sql import functions as F

census_df = df.withColumn("sex_mf", F.when(df['SEX'] == 1, "Male").otherwise("Female")) \
  .withColumn("hispanic_origin", F.when(df.ORIGIN == 2, "Hispanic").otherwise("Not Hispanic")) \
  .withColumn("age_group", 
              F.when(df["AGEGRP"] == 1, "Age 0 to 4 years") 
              .when(df["AGEGRP"] == 2, "Age 5 to 9 years") 
              .when(df["AGEGRP"] == 3, "Age 10 to 14 years")
              .when(df["AGEGRP"] == 4, "Age 15 to 19 years")
              .when(df["AGEGRP"] == 5, "Age 20 to 24 years")
              .when(df["AGEGRP"] == 6, "Age 25 to 29 years")
              .when(df["AGEGRP"] == 7, "Age 30 to 34 years")
              .when(df["AGEGRP"] == 8, "Age 35 to 39 years")
              .when(df["AGEGRP"] == 9, "Age 40 to 44 years")
              .when(df["AGEGRP"] == 10, "Age 45 to 49 years")
              .when(df["AGEGRP"] == 11, "Age 50 to 54 years")
              .when(df["AGEGRP"] == 12, "Age 55 to 59 years")
              .when(df["AGEGRP"] == 13, "Age 60 to 64 years")
              .when(df["AGEGRP"] == 14, "Age 65 to 69 years")
              .when(df["AGEGRP"] == 15, "Age 70 to 74 years")
              .when(df["AGEGRP"] == 16, "Age 75 to 79 years")
              .when(df["AGEGRP"] == 17, "Age 80 to 84 years")
              .when(df["AGEGRP"] == 18, "Age 85 years or older")
              .otherwise("No age given")
             ) \

Spark Scala

import sqlContext.implicits._ // for `toDF` and $""
import org.apache.spark.sql.functions._ // for `when`

val census_df = df
  .withColumn("sex_mf", when($"SEX" === 1, "Male").otherwise("Female"))
  .withColumn("hispanic_origin", when(df("SEX") === 2, "Hispanic").otherwise("Not Hispanic"))
  .withColumn("age_group",
              when(df("AGEGRP") === 1, "Age 0 to 4 years") 
              .when(df("AGEGRP") === 2, "Age 5 to 9 years") 
              .when(df("AGEGRP") === 3, "Age 10 to 14 years")
              .when(df("AGEGRP") === 4, "Age 15 to 19 years")
              .when(df("AGEGRP") === 5, "Age 20 to 24 years")
              .when(df("AGEGRP") === 6, "Age 25 to 29 years")
              .when(df("AGEGRP") === 7, "Age 30 to 34 years")
              .when(df("AGEGRP") === 8, "Age 35 to 39 years")
              .when(df("AGEGRP") === 9, "Age 40 to 44 years")
              .when(df("AGEGRP") === 10, "Age 45 to 49 years")
              .when(df("AGEGRP") === 11, "Age 50 to 54 years")
              .when(df("AGEGRP") === 12, "Age 55 to 59 years")
              .when(df("AGEGRP") === 13, "Age 60 to 64 years")
              .when(df("AGEGRP") === 14, "Age 65 to 69 years")
              .when(df("AGEGRP") === 15, "Age 70 to 74 years")
              .when(df("AGEGRP") === 16, "Age 75 to 79 years")
              .when(df("AGEGRP") === 17, "Age 80 to 84 years")
              .when(df("AGEGRP") === 18, "Age 85 years or older")
              .otherwise("No age given")
             )
  1. Chain another conditional calculated column to the code above named “race” from the column “AGEGRP“. Each category of the “AGEGRP” column represents an unique race as outlined in the Modified Race Data 2010 Document. Add all thirty-one unique races plus the otherwise condition “No race provided”.

PySpark code:

  .withColumn("race",
              F.when(df["IMPRACE"] == 1, "White alone")
              .when(df["IMPRACE"] == 2, "Black or African American alone")
              .when(df["IMPRACE"] == 3, "American Indian and Alaska Native alone")
              .when(df["IMPRACE"] == 4, "Asian alone")
              .when(df["IMPRACE"] == 5, "Native Hawaiian and Other Pacific Islander alone")
              .when(df["IMPRACE"] == 6, "White and Black or African American")
              .when(df["IMPRACE"] == 7, "White and American Indian and Alaska Native")
              .when(df["IMPRACE"] == 8, "White and Asian")
              .when(df["IMPRACE"] == 9, "White and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 10, "Black or African American and American Indian and Alaska Native")
              .when(df["IMPRACE"] == 11, "Black or African American and American Indian and Alaska Native")
              .when(df["IMPRACE"] == 12, "Black or African American and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 13, "American Indian and Alaska Native and Asian")
              .when(df["IMPRACE"] == 14, "American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 15, "Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 16, "White and Black or African American and American Indian and Alaska Native")
              .when(df["IMPRACE"] == 17, "White and Black or African American and Asian")
              .when(df["IMPRACE"] == 18, "White and Black or African American and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 19, "White and American Indian and Alaska Native and Asian")
              .when(df["IMPRACE"] == 20, "White and American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 21, "White and Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 22, "Black or African American and American Indian and Alaska Native and Asian")
              .when(df["IMPRACE"] == 23, "Black or African American and American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 24, "Black or African American and Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 25, "American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 26, "White and Black or African American and American Indian and Alaska Native and Asian")
              .when(df["IMPRACE"] == 27, "White and Black or African American and American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 28, "White and Black or African American and Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 29, "White and American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 30, "Black or African American and American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 31, "White and Black or African American and American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .otherwise("No race provided")
             ) \

Spark Scala code:

  .withColumn("race",
              when($"IMPRACE" === 1, "White alone")
              .when($"IMPRACE" === 2, "Black or African American alone")
              .when($"IMPRACE" === 3, "American Indian and Alaska Native alone")
              .when($"IMPRACE" === 4, "Asian alone")
              .when($"IMPRACE" === 5, "Native Hawaiian and Other Pacific Islander alone")
              .when($"IMPRACE" === 6, "White and Black or African American")
              .when($"IMPRACE" === 7, "White and American Indian and Alaska Native")
              .when($"IMPRACE" === 8, "White and Asian")
              .when($"IMPRACE" === 9, "White and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 10, "Black or African American and American Indian and Alaska Native")
              .when($"IMPRACE" === 11, "Black or African American and American Indian and Alaska Native")
              .when($"IMPRACE" === 12, "Black or African American and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 13, "American Indian and Alaska Native and Asian")
              .when($"IMPRACE" === 14, "American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 15, "Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 16, "White and Black or African American and American Indian and Alaska Native")
              .when($"IMPRACE" === 17, "White and Black or African American and Asian")
              .when($"IMPRACE" === 18, "White and Black or African American and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 19, "White and American Indian and Alaska Native and Asian")
              .when($"IMPRACE" === 20, "White and American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 21, "White and Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 22, "Black or African American and American Indian and Alaska Native and Asian")
              .when($"IMPRACE" === 23, "Black or African American and American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 24, "Black or African American and Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 25, "American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 26, "White and Black or African American and American Indian and Alaska Native and Asian")
              .when($"IMPRACE" === 27, "White and Black or African American and American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 28, "White and Black or African American and Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 29, "White and American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 30, "Black or African American and American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 31, "White and Black or African American and American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .otherwise("No race provided")
             )
  1. Chain the withColumnRenamed() method to rename the columns “STNAME” to “us_state”, “CTYNAME” to “county_name”, and “RESPOP” to “population”. 

PySpark

  .withColumnRenamed("STNAME", "us_state") \
  .withColumnRenamed("CTYNAME", "county_name") \
  .withColumnRenamed("RESPOP", "population") \

Spark Scala

  .withColumnRenamed("STNAME", "us_state")
  .withColumnRenamed("CTYNAME", "county_name")
  .withColumnRenamed("RESPOP", "population")
  1. And lastly add the select() method to the code above to choose only the following columns: “us_state”, “county_name”, “sex_mf”, “hispanic_origin”, “age_group”, “race”, “population”.

PySpark and Spark Scala code:

  .select("us_state", "county_name", "sex_mf", "hispanic_origin", "age_group", "race", "population")
  1. Print the schema of the new DataFrame “census_df” and display the first ten rows of “census_df” along with the first forty characters of each cell. PySpark & Spark Scala code:
census_df.printSchema()

census_df.show(10, 40)
root
 |-- us_state: string (nullable = true)
 |-- county_name: string (nullable = true)
 |-- sex_mf: string (nullable = false)
 |-- hispanic_origin: string (nullable = false)
 |-- age_group: string (nullable = false)
 |-- race: string (nullable = false)
 |-- population: integer (nullable = true)

+--------+--------------+------+---------------+----------------+----------------------------------------+----------+
|us_state|   county_name|sex_mf|hispanic_origin|       age_group|                                    race|population|
+--------+--------------+------+---------------+----------------+----------------------------------------+----------+
| Alabama|Autauga County|  Male|   Not Hispanic|Age 0 to 4 years|                             White alone|      1337|
| Alabama|Autauga County|  Male|   Not Hispanic|Age 0 to 4 years|         Black or African American alone|       356|
| Alabama|Autauga County|  Male|   Not Hispanic|Age 0 to 4 years| American Indian and Alaska Native alone|         2|
| Alabama|Autauga County|  Male|   Not Hispanic|Age 0 to 4 years|                             Asian alone|        13|
| Alabama|Autauga County|  Male|   Not Hispanic|Age 0 to 4 years|     White and Black or African American|        33|
| Alabama|Autauga County|  Male|   Not Hispanic|Age 0 to 4 years|White and American Indian and Alaska ...|        11|
| Alabama|Autauga County|  Male|   Not Hispanic|Age 0 to 4 years|                         White and Asian|        17|
| Alabama|Autauga County|  Male|   Not Hispanic|Age 0 to 4 years|Black or African American and America...|         4|
| Alabama|Autauga County|  Male|   Not Hispanic|Age 0 to 4 years|Black or African American and America...|         1|
| Alabama|Autauga County|  Male|   Not Hispanic|Age 0 to 4 years|White and Black or African American a...|         1|
+--------+--------------+------+---------------+----------------+----------------------------------------+----------+
only showing top 10 rows

The final combined code to create the “census_df” DataFrame:

PySpark

from pyspark.sql import functions as F

census_df = df.withColumn("sex_mf", F.when(df['SEX'] == 1, "Male").otherwise("Female")) \
  .withColumn("hispanic_origin", F.when(df.ORIGIN == 2, "Hispanic").otherwise("Not Hispanic")) \
  .withColumn("age_group", 
              F.when(df["AGEGRP"] == 1, "Age 0 to 4 years") 
              .when(df["AGEGRP"] == 2, "Age 5 to 9 years") 
              .when(df["AGEGRP"] == 3, "Age 10 to 14 years")
              .when(df["AGEGRP"] == 4, "Age 15 to 19 years")
              .when(df["AGEGRP"] == 5, "Age 20 to 24 years")
              .when(df["AGEGRP"] == 6, "Age 25 to 29 years")
              .when(df["AGEGRP"] == 7, "Age 30 to 34 years")
              .when(df["AGEGRP"] == 8, "Age 35 to 39 years")
              .when(df["AGEGRP"] == 9, "Age 40 to 44 years")
              .when(df["AGEGRP"] == 10, "Age 45 to 49 years")
              .when(df["AGEGRP"] == 11, "Age 50 to 54 years")
              .when(df["AGEGRP"] == 12, "Age 55 to 59 years")
              .when(df["AGEGRP"] == 13, "Age 60 to 64 years")
              .when(df["AGEGRP"] == 14, "Age 65 to 69 years")
              .when(df["AGEGRP"] == 15, "Age 70 to 74 years")
              .when(df["AGEGRP"] == 16, "Age 75 to 79 years")
              .when(df["AGEGRP"] == 17, "Age 80 to 84 years")
              .when(df["AGEGRP"] == 18, "Age 85 years or older")
              .otherwise("No age given")
             ) \
  .withColumn("race",
              F.when(df["IMPRACE"] == 1, "White alone")
              .when(df["IMPRACE"] == 2, "Black or African American alone")
              .when(df["IMPRACE"] == 3, "American Indian and Alaska Native alone")
              .when(df["IMPRACE"] == 4, "Asian alone")
              .when(df["IMPRACE"] == 5, "Native Hawaiian and Other Pacific Islander alone")
              .when(df["IMPRACE"] == 6, "White and Black or African American")
              .when(df["IMPRACE"] == 7, "White and American Indian and Alaska Native")
              .when(df["IMPRACE"] == 8, "White and Asian")
              .when(df["IMPRACE"] == 9, "White and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 10, "Black or African American and American Indian and Alaska Native")
              .when(df["IMPRACE"] == 11, "Black or African American and American Indian and Alaska Native")
              .when(df["IMPRACE"] == 12, "Black or African American and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 13, "American Indian and Alaska Native and Asian")
              .when(df["IMPRACE"] == 14, "American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 15, "Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 16, "White and Black or African American and American Indian and Alaska Native")
              .when(df["IMPRACE"] == 17, "White and Black or African American and Asian")
              .when(df["IMPRACE"] == 18, "White and Black or African American and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 19, "White and American Indian and Alaska Native and Asian")
              .when(df["IMPRACE"] == 20, "White and American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 21, "White and Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 22, "Black or African American and American Indian and Alaska Native and Asian")
              .when(df["IMPRACE"] == 23, "Black or African American and American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 24, "Black or African American and Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 25, "American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 26, "White and Black or African American and American Indian and Alaska Native and Asian")
              .when(df["IMPRACE"] == 27, "White and Black or African American and American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 28, "White and Black or African American and Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 29, "White and American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 30, "Black or African American and American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .when(df["IMPRACE"] == 31, "White and Black or African American and American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .otherwise("No race provided")
             ) \
  .withColumnRenamed("STNAME", "us_state") \
  .withColumnRenamed("CTYNAME", "county_name") \
  .withColumnRenamed("RESPOP", "population") \
  .select("us_state", "county_name", "sex_mf", "hispanic_origin", "age_group", "race", "population")

Spark Scala

import sqlContext.implicits._ // for `toDF` and $""
import org.apache.spark.sql.functions._ // for `when`

val census_df = df
  .withColumn("sex_mf", when($"SEX" === 1, "Male").otherwise("Female"))
  .withColumn("hispanic_origin", when(df("SEX") === 2, "Hispanic").otherwise("Not Hispanic"))
  .withColumn("age_group",
              when(df("AGEGRP") === 1, "Age 0 to 4 years") 
              .when(df("AGEGRP") === 2, "Age 5 to 9 years") 
              .when(df("AGEGRP") === 3, "Age 10 to 14 years")
              .when(df("AGEGRP") === 4, "Age 15 to 19 years")
              .when(df("AGEGRP") === 5, "Age 20 to 24 years")
              .when(df("AGEGRP") === 6, "Age 25 to 29 years")
              .when(df("AGEGRP") === 7, "Age 30 to 34 years")
              .when(df("AGEGRP") === 8, "Age 35 to 39 years")
              .when(df("AGEGRP") === 9, "Age 40 to 44 years")
              .when(df("AGEGRP") === 10, "Age 45 to 49 years")
              .when(df("AGEGRP") === 11, "Age 50 to 54 years")
              .when(df("AGEGRP") === 12, "Age 55 to 59 years")
              .when(df("AGEGRP") === 13, "Age 60 to 64 years")
              .when(df("AGEGRP") === 14, "Age 65 to 69 years")
              .when(df("AGEGRP") === 15, "Age 70 to 74 years")
              .when(df("AGEGRP") === 16, "Age 75 to 79 years")
              .when(df("AGEGRP") === 17, "Age 80 to 84 years")
              .when(df("AGEGRP") === 18, "Age 85 years or older")
              .otherwise("No age given")
             )
  .withColumn("race",
              when($"IMPRACE" === 1, "White alone")
              .when($"IMPRACE" === 2, "Black or African American alone")
              .when($"IMPRACE" === 3, "American Indian and Alaska Native alone")
              .when($"IMPRACE" === 4, "Asian alone")
              .when($"IMPRACE" === 5, "Native Hawaiian and Other Pacific Islander alone")
              .when($"IMPRACE" === 6, "White and Black or African American")
              .when($"IMPRACE" === 7, "White and American Indian and Alaska Native")
              .when($"IMPRACE" === 8, "White and Asian")
              .when($"IMPRACE" === 9, "White and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 10, "Black or African American and American Indian and Alaska Native")
              .when($"IMPRACE" === 11, "Black or African American and American Indian and Alaska Native")
              .when($"IMPRACE" === 12, "Black or African American and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 13, "American Indian and Alaska Native and Asian")
              .when($"IMPRACE" === 14, "American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 15, "Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 16, "White and Black or African American and American Indian and Alaska Native")
              .when($"IMPRACE" === 17, "White and Black or African American and Asian")
              .when($"IMPRACE" === 18, "White and Black or African American and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 19, "White and American Indian and Alaska Native and Asian")
              .when($"IMPRACE" === 20, "White and American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 21, "White and Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 22, "Black or African American and American Indian and Alaska Native and Asian")
              .when($"IMPRACE" === 23, "Black or African American and American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 24, "Black or African American and Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 25, "American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 26, "White and Black or African American and American Indian and Alaska Native and Asian")
              .when($"IMPRACE" === 27, "White and Black or African American and American Indian and Alaska Native and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 28, "White and Black or African American and Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 29, "White and American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 30, "Black or African American and American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .when($"IMPRACE" === 31, "White and Black or African American and American Indian and Alaska Native and Asian and Native Hawaiian and Other Pacific Islander")
              .otherwise("No race provided")
             )
  .withColumnRenamed("STNAME", "us_state")
  .withColumnRenamed("CTYNAME", "county_name")
  .withColumnRenamed("RESPOP", "population")
  .select("us_state", "county_name", "sex_mf", "hispanic_origin", "age_group", "race", "population")

Now, that we have a cleaned and processed DataFrame, we can explore the US census data very easily. Let’s start with getting the total US population in 2010.

PySpark

from pyspark.sql import functions as F

census_df.agg(F.sum(census_df["population"])).show()

Spark Scala

import org.apache.spark.sql.functions.sum

census_df.agg(sum(census_df ("population"))).show()

Output

+---------------+
|sum(population)|
+---------------+
|      308745538|
+---------------+

So, according to the 2010 US census, the population was 308,745,538 people. 

Instead of returning a DataFrame with one value, we can use the collect() method to get the actual value. This is helpful when you want to save a value in a DataFrame as a variable. The collect() method, when called on a DataFrame, returns a list in Python or array in Scala of Rows for each column in the DataFrame. Since the output is either a list or array we can specifically select the elements. Let’s show an example.

PySpark

census_df.agg(F.sum(census_df["population"])).collect()

Output

Out[2]: [Row(sum(population)=308745538)]

Spark Scala

census_df.agg(sum(census_df("population"))).collect()

Output

res2: Array[org.apache.spark.sql.Row] = Array([308745538])

And we can select twice in the list, each time at the zeroth element to get only the value.

PySpark

total_population = census_df.agg(F.sum(census_df["population"])).collect()[0][0]

Spark Scala

val total_population = census_df.agg(sum(census_df("population"))).collect()(0)(0)

Exercise 5: Aggregate Data Exploration

In this exercise, we will use aggregate functions covered in Chapter 1 to explore the data of the “census_df” DataFrame.

  1. Use the “census_df” DataFrame to find the ten most populous states in America. Use the agg() method and the sum() function to sum the population of each state. Order the final DataFrame in descending order.

PySpark

from pyspark.sql import functions as F

(census_df.groupBy("us_state").agg(F.sum(census_df["population"]).alias("population"))
  .orderBy(F.col("population").desc())
  .show(10))

Spark Scala

import org.apache.spark.sql.functions.{col, sum}

census_df.groupBy("us_state").agg(sum(census_df("population")).alias("population"))
  .orderBy(col("population").desc)
  .show(10)

Output

+--------------+----------+
|      us_state|population|
+--------------+----------+
|    California|  37253956|
|         Texas|  25145561|
|      New York|  19378102|
|       Florida|  18801310|
|      Illinois|  12830632|
|  Pennsylvania|  12702379|
|          Ohio|  11536504|
|      Michigan|   9883640|
|       Georgia|   9687653|
|North Carolina|   9535483|
+--------------+----------+
only showing top 10 rows
  1. For a more difficult example, find the top ten counties in the state of Ohio for people in their thirties with a race of “Black or African American alone”.

PySpark

from pyspark.sql import functions as F

(census_df
  .filter(census_df["us_state"] == "Ohio")
  .filter((census_df["age_group"] == "Age 30 to 34 years") | (census_df["age_group"] == "Age 35 to 39 years"))
  .filter(census_df["race"] == "Black or African American alone")
  .groupBy("county_name")
  .agg(F.sum(census_df["population"]).alias("population"))
  .orderBy(F.col("population").desc())
  .show(10))

Spark Scala

import spark.implicits._
import org.apache.spark.sql.functions.col

census_df
  .filter($"us_state" === "Ohio")
  .filter($"age_group" === "Age 30 to 34 years" || $"age_group" === "Age 35 to 39 years")
  .filter($"race" === "Black or African American alone")
  .groupBy("county_name")
  .agg(sum($"population").alias("population"))
  .orderBy(col("population").desc)
  .show(10)

Outcome

+-----------------+----------+
|      county_name|population|
+-----------------+----------+
|  Cuyahoga County|     45283|
|  Franklin County|     36218|
|  Hamilton County|     25444|
|Montgomery County|     13560|
|     Lucas County|     10681|
|    Summit County|      9628|
|  Mahoning County|      4747|
|    Butler County|      4053|
|    Lorain County|      3629|
|     Stark County|      3447|
+-----------------+----------+
only showing top 10 rows

This exercise showed more advanced DataFrame aggregation examples including combining many methods together. 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.