This activity will combine the skills and techniques you learned so far in this chapter. 

Imagine you are working as an intern at XYZ BigData Analytics Firm, where you are tasked with performing data analysis on a data set to solidify your Spark DataFrame skills. 

In this activity you will use the data set “Kew Museum of Economic Botany: Object Dispersals to Schools, 1877-1982” which “is a record of object dispersals from the Kew Museum of Economic Botany to schools between 1877 and 1982” that “is based on entries in the ‘Specimens Distributed’ books (or exit books) in the Museum archive.” The source of the data set can be found at https://figshare.com/articles/Schools_dispersals_data_with_BHL_links/9573956 This activity is a good practice for a real-world scenario because it simulates being given a random data set that you must process and analyze when you have no prior connection to the data set.

Steps

  1. The CSV “schools_dispersals_data_with_bhl_links.csv” can be downloaded from the link used above https://figshare.com/articles/dataset/Schools_dispersals_data_with_BHL_links/9573956. Once on the website click the “Download” button. Or you may use the direct link to the download: https://ndownloader.figshare.com/files/17208041
  2. Using a command line application on your computer, use the scp command to copy the CSV file from your local computer to the edge node of your Hadoop cluster. Then use the hadoop fs -put command to copy the CSV file from your edge node to HDFS. 
  3. Once the CSV file is in HDFS we can now use it to make a Spark DataFrame. Use any of the methods we introduced in Section 1.3 to read CSV files from HDFS to create a DataFrame. Be sure to use the parameter inferSchema to try to infer the schema from the data. Save the DataFrame using the variable name df. 
  4. Print the schema of the DataFrame. Compare the schema of the DataFrame with the sample data view on the website page link provided in Step 1 to see if the schema matches the data. 
  5. From inspecting the DataFrames schema we see there are a total of fifteen columns with two integer columns and the rest are strings. Also, right away we notice that the column names are long, verbose, and include spaces. We need to rename every column to names that are succinct and have no spaces. Rename all fifteen columns to the column names as follows:
"event_date_string"
"period"
"page_number"
"recipient_name"
"formal_school_name"
"name"
"school_name"
"city"
"level_of_study"
"school_type"
"denomination_or_org"
"object_count"
"event_info"
"book_link_1"
"book_link_2"

Hint: watch out for column names with extra spaces.

  1. Now we can look at the data. Display the first thirty rows of the DataFrame.
  2. The last two columns, "book_link_1" and "book_link_2", are just URL links. In addition, the columns are full of blank, null, or missing data. Drop the two book link columns.
  3. The column "event_date_string” has a string data type because it is not properly formatted for a date. Instead of "/" or "-" it uses dots ".". Create a new date column that has a date type of "date" with the name "event_date".

    Hint: the normal way of converting a column to another data type won’t work because Spark cannot interpret the format of the strings. It will need to be explicitly told how to make a date out of the strings. Research the PySpark or Spark Scala function to_date.

    Second hint: "dd.MM.yyyy"
  4. Now that we have a proper date column we don’t need the string date column. Also, the column "period" appears to be the same value for each row and the "event_info" column has many nulls and doesn’t seem very useful. We don’t want to remove these columns because we might use them in the future. So, choose every column in the DataFrame except "event_date_string", "period", and "event_info". Also, move the new "event_date" column to the first column on the left. Save the new DataFrame using the variable kew_df so we can always retrieve the two columns using the variable df
  5. Sort the new DataFrame on the "event_date" column from smallest to largest date.
  6. Count the number of rows in the DataFrame.
  7. Display the unique count of cities in the DataFrame. Then display the first sixty unique city names in alphabetical order from A to Z.
  8. For each group in "level_of_study" find the sum of the "object_count" and display the results by sum of the object count in order from smallest to largest.
  9. Find the average page number for each year in the “event_date” column. Display the forty-three rows in the resulting DataFrame.

    Hint: Research the PySpark or Spark Scala function year.
  10. Find the minimum number of objects for each group of school type and denomination. In the resulting DataFrame the minimum number of objects column should be called "min_objects" and display all forty-eight resulting rows.

PySpark Solution

  1. These instructions are for a Mac using the Terminal (or similar) app. For Windows instructions: buy a Mac 😉
# This command copies the CSV file from your local Mac computer to the edge node of your Hadoop cluster
# assumes the CSV is in your Downloads directory
# (Run this command in the Terminal app of your local computer)
$ scp -rp /Users/your_directory/Downloads/Schools%20dispersals%20data%20with%20BHL%20links.csv user_name@cluster_ip_address:/home/your_user_name
# This command copies the CSV file from your Hadoop clusters edge node to HDFS of your Hadoop cluster
# Places the CSV file in a directory named "data" in HDFS
# (Log into your Hadoop cluster and run this command)
$ hadoop fs -put /home/your_user_name/Schools%20dispersals%20data%20with%20BHL%20links.csv /user/your_user_name/data
  1. Create DataFrame and use the read method in conjunction with the load() method to create a DataFrame from the CSV file using the following code: 
df = spark.read.format("csv").load("hdfs://user/your_user_name/data/Schools%20dispersals%20data%20with%20BHL%20links.csv"
                                  , sep=","
                                  , inferSchema="true"
                                  , header="true")
  1. Print Schema using the following code: df.printSchema()
  2. Use the withColumnRenamed() method to rename every column. All of the withColumnRenamed() methods can be chained together at once. Be watchful because the column name “Level of study ” has an extra space at the end. Without including the extra space it won’t match and the column won’t get renamed:
df = df.withColumnRenamed("EventDate", "event_date_string") \
  .withColumnRenamed("Period", "period") \
  .withColumnRenamed("Page number (if applic)", "page_number") \
  .withColumnRenamed("Recipient name (original transcription)", "recipient_name") \
  .withColumnRenamed("Nature of recipient (original transcription)", "formal_school_name") \
  .withColumnRenamed("name", "name") \
  .withColumnRenamed("Institution - common name from Exit Books", "school_name") \
  .withColumnRenamed("Town/City", "city") \
  .withColumnRenamed("Level of study ", "level_of_study") \
  .withColumnRenamed("School type", "school_type") \
  .withColumnRenamed("Denomination/organisation", "denomination_or_org") \
  .withColumnRenamed("No. of objects", "object_count") \
  .withColumnRenamed("Event information (any extra information not in list of specimens)", "event_info") \
  .withColumnRenamed("BHL Exit Book Link", "book_link_1") \
  .withColumnRenamed("BHL School Letter Book Link (Vol 1 only)", "book_link_2")
  1. Display data using the following code: df.show(30)
  2. Use the drop() method to drop columns. Multiple columns can be dropped in one operation by separating the column names by commas using the following code:
df = df.drop("book_link_1", "book_link_2")
  1. Use the function to_date to convert the string column into a date type. The first parameter is the column to convert and the second parameter is the date format of the current string column:
from pyspark.sql.types import DateType
from pyspark.sql.functions import to_date

df = df.withColumn("event_date", to_date(df["event_date_string"], "dd.MM.yyyy"))
  1. Use the select() method on the DataFrame to choose which columns to be in the new DataFrame variable name. Since all of the column names are comma separated we can order them in any way. In our example put the new date type column "event_date" first to make it the first column in the new DataFrame:
kew_df = df.select("event_date", "page_number", "recipient_name", "formal_school_name", "name", "school_name", "city", "level_of_study", "school_type", "denomination_or_org", "object_count")
  1. Use the orderBy() method to sort the DataFrame. Since dates are really integers, use .asc() to sort from oldest (smallest) to newest (largest) date:
kew_df = kew_df.orderBy(df["event_date"].asc())
  1. Use the count() method to count the number of rows in the DataFrame: kew_df.count()
  2. Select the column "city” and then call the distinct() method to return the unique values in that column. Then call the count() method to get the count of those distinct cities:
kew_df.select(kew_df["city"]).distinct().count()
  1. After calling the distinct() method use the orderBy() method to sort the resulting DataFrame column. You will have to use the col() function in the orderBy() method to select the column after the distinct operation. Then use the show() method to display the first sixty rows:
from pyspark.sql.functions import col

kew_df.select(kew_df["city"]).distinct().orderBy(col("city").asc()).show(60)
  1. Call the groupBy() method and then the agg() method. Inside the agg() method use the sum function on the "object_count” column. Lastly, call the orderBy() method on the new column name "sum(object_count)":
from pyspark.sql import functions as F

kew_df.groupBy("level_of_study").agg(F.sum(kew_df["object_count"])).orderBy(F.col("sum(object_count)")).show()
  1. It is possible to call a function inside the groupBy() method. Use the year function on the column "event_date" to group all of the rows by year. Then aggregate with the average function. Show all forty-three rows:
from pyspark.sql import functions as F

kew_df.groupBy(F.year("event_date")).agg(F.avg(df["page_number"])).show(43)
  1. Use multiple columns in the groupBy() method. Then use the agg() method to find the minimum of the "obect_count" column. Chain the alias() method to the minimum operation inside the agg function:
from pyspark.sql import functions as F

kew_df.groupBy("school_type", "denomination_or_org").agg(F.min(kew_df["object_count"]).alias("min_objects")).show(50)

Spark Scala Solution

  1. These instructions are for a Mac using the Terminal (or similar) app. For Windows instructions: buy a Mac 😉
# This command copies the CSV file from your local Mac computer to the edge node of your Hadoop cluster
# assumes the CSV is in your Downloads directory
# (Run this command in the Terminal app of your local computer)
$ scp -rp /Users/your_directory/Downloads/Schools%20dispersals%20data%20with%20BHL%20links.csv user_name@cluster_ip_address:/home/your_user_name
# This command copies the CSV file from your Hadoop clusters edge node to HDFS of your Hadoop cluster
# Places the CSV file in a directory named "data" in HDFS
# (Log into your Hadoop cluster and run this command)
$ hadoop fs -put /home/your_user_name/Schools%20dispersals%20data%20with%20BHL%20links.csv /user/your_user_name/data
  1. Use the read method in conjunction with the load() method to create a DataFrame from the CSV file:
val df = spark.read
  .format("csv")
  .option("sep", ",")
  .option("header", "true")
  .option("inferSchema", "true")
.load("hdfs://user/your_user_name/data/Schools%20dispersals%20data%20with%20BHL%20links.csv")
  1. Print schema using the following code: df.printSchema
  2. Use the withColumnRenamed() method to rename every column. All of the withColumnRenamed() methods can be chained together at once. Be watchful because the column name "Level of study " has an extra space at the end. Without including the extra space it won’t match and the column won’t get renamed:
val scala_df = df
  .withColumnRenamed("EventDate", "event_date_string")
  .withColumnRenamed("Period", "period")
  .withColumnRenamed("Page number (if applic)", "page_number")
  .withColumnRenamed("Recipient name (original transcription)", "recipient_name")
  .withColumnRenamed("Nature of recipient (original transcription)", "formal_school_name")
  .withColumnRenamed("name", "name")
  .withColumnRenamed("Institution - common name from Exit Books", "school_name")
  .withColumnRenamed("Town/City", "city")
  .withColumnRenamed("Level of study ", "level_of_study")
  .withColumnRenamed("School type", "school_type")
  .withColumnRenamed("Denomination/organisation", "denomination_or_org")
  .withColumnRenamed("No. of objects", "object_count")
  .withColumnRenamed("Event information (any extra information not in list of specimens)", "event_info")
  .withColumnRenamed("BHL Exit Book Link", "book_link_1")
  .withColumnRenamed("BHL School Letter Book Link (Vol 1 only)", "book_link_2")
  1. Display data using the following code: scala_df.show()
  2. Use the drop() method to drop columns. Multiple columns can be dropped in one operation by separating the column names by commas:
val drop_df = scala_df.drop("book_link_1", "book_link_2")
  1. Use the function to_date to convert the string column into a date type. The first parameter is the column to convert and the second parameter is the date format of the current string column:
import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.functions.to_date

val date_df = drop_df.withColumn("event_date", to_date(drop_df("event_date_string"), "dd.MM.yyyy"))
  1. Use the select() method on the DataFrame to choose which columns to be in the new DataFrame variable name. Since all of the column names are comma separated, we can order them in any way. In our example put the new date type column "event_date" first to make it appear as the first column in the DataFrame:
val select_df = date_df.select("event_date", "page_number", "recipient_name", "formal_school_name", "name", "school_name", "city", "level_of_study", "school_type", "denomination_or_org", "object_count")
  1. Use the orderBy() method to sort the DataFrame. Since dates are really integer, use .asc to sort from oldest (smallest) to newest (largest) date.
val kew_df = select_df.orderBy(select_df("event_date").asc)
  1. Use the count() method to count the number of rows in the DataFrame: kew_df.count
  2. Select the column "city" and then call the distinct() method to return the unique values in that column. Then call the count() method to get the count of those distinct cities:
kew_df.select(kew_df("city")).distinct.count
  1. After calling the distinct() method use the orderBy() method to sort the resulting DataFrame column. You will have to use the col() function in the orderBy() method to select the column after the distinct operation. Then use the show() method to display the first sixty rows:
import org.apache.spark.sql.functions.col

kew_df.select(kew_df("city")).distinct.orderBy(col("city").asc).show(60)
  1. Call the groupBy() method and then the agg() method. Inside the agg() method use the sum function on the "object_count" column. Lastly, call the orderBy() method on the new column name "sum(object_count)":
import org.apache.spark.sql.functions.{sum, col}

kew_df.groupBy("level_of_study").agg(sum(kew_df("object_count"))).orderBy(col("sum(object_count)")).show()
  1. It is possible to call a function inside the groupBy() method. Use the year function on the column "event_date" to group all of the rows by year. Then aggregate with the average function. Show all forty-three rows:
import org.apache.spark.sql.functions.{avg, year}

kew_df.groupBy(year(kew_df("event_date"))).agg(avg(kew_df("page_number"))).show(43)
  1. Use multiple columns in the groupBy() method. Then use the agg() method to find the minimum of the "obect_count" column. Chain the alias() method to the minimum operation inside the agg function:
import org.apache.spark.sql.functions.min

kew_df.groupBy("school_type", "denomination_or_org").agg(min(kew_df("object_count")).alias("min_objects")).show(50)

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.