Introduction

So far, this Spark book has focused on Spark DataFrames: how to create, use, and transform DataFrames. You will use Spark DataFrames for the remainder of this book. But we wanted to introduce Datasets because in reality Spark DataFrames are a type of Spark Dataset. A Spark Dataset is a “strongly typed collection of domain-specific objects”. What this means is that a Dataset has a set or defined type (or schema) that is prescribed before the Dataset is created. Technically, a DataFrame is an untyped Dataset [Row] (which means it doesn’t have a schema at compile time).

The advantage of Datasets is that the schema is defined before the code has even been executed. This protects your code by being able to check your code before it has even run. For example, if during an operation you specify a column that is the wrong data type for that operation, Spark will detect this data type mismatch and raise a compile-time error (before your code runs) instead of an execution run-time error (when the code is running). Datasets are good for domain specific data, that won’t change, as well as production systems. If you use Datasets in your production code, it will add an extra layer of protection by raising compile-time errors instead of execution run-time errors. 

Since Datasets use Java Virtual Machines (JVMs) the Dataset API is only available in the Scala programming language. Unfortunately, Datasets cannot be used with PySpark. But the good news is that all of the Spark DataFrame Scala operations and commands we have learned in so fare can be used with Datasets. 

Create a Dataset

Datasets are very easy to create. You start with a Spark DataFrame and call one simple operation to convert it into a Dataset. 

In this section we will be using the Adult data set. The following Spark code creates a DataFrame from a CSV. But in this example we are chaining together several operations together including dropping several unneeded columns, renaming four columns to remove dashes "-" , and renaming a column that is a stop word, "class", by appending "_label" to the end.

val adult_df = spark.read.format("csv")
  .option("sep", ",")
  .option("inferSchema", "true")
  .option("header", "false")
  .load("hdfs://…/adult/adult_data.csv")
  .toDF("age", "workclass", "fnlwgt", "education", "education-num", "marital-status", "occupation", "relationship", "race", "sex", "capital-gain", "capital-loss", "hours-per-week", "native-country", "class")
  .drop("fnlwgt", "education-num", "capital-gain", "capital-loss")
  .withColumnRenamed("marital-status", "marital_status")
  .withColumnRenamed("hours-per-week", "hours_per_week")
  .withColumnRenamed("native-country", "native_country")
  .withColumnRenamed("class", "class_label")

The first step in creating a Dataset is to create a case class. A case class is how we define the type or schema of the eventual Dataset. To create a case class start off with the keywords “case class” then an identifier (normally the first letter is upper case), and finally a parameter list of column names and column data types. A basic made-up example would look like:

case class Store(store_name: String, zipcode: Integer)

The parameter list is a key-value pair of column names and column data types. The data types must have a proper-case data type, “String” instead of “string”. The case class for the Adult data set would be:

case class Adult(age: Integer, workclass: String, education: String, marital_status: String, occupation: String, relationship: String, race: String, sex: String, hours_per_week: Double, native_country: String, class_label: String)

To convert a Spark DataFrame to a Dataset call the function .as[] on the DataFrame with the case class identifier in the brackets. It is that easy! The adult_df DataFrame can be converted into a Spark Dataset with the following code:

val adult_ds = adult_df.as[Adult]

Now that we have a Dataset we can use any of the Scala DataFrame operations, functions, or methods.

Splitting Data into Training and Testing Subsets

A common technique used in machine learning applications is to split a DataFrame up into training and testing subsets of data. The typical machine learning workflow is to train your machine learning algorithm on the training subset and evaluate the performance of your model on the testing subset. Most times the training subset is comprised of 70% of the original DataFrame and the testing is 30% (but this isn’t a standard). In Scala to split a DataFrame into training and testing subset we can use the randomSplit() method. The first parameter is an array of decimal numbers corresponding to the percentages of rows we would like each resulting DataFrame to have. The second optional parameter is the seed and is an integer that allows us to get the same samples of data no matter how many times we execute the method (on the same DataFrame). We can split our Datasets into training and testing Datasets the same way we would split up DataFrames. This would look like:

val Array(training_ds, testing_ds) = adult_ds.randomSplit(Array(0.7, 0.3), seed = 775)

Now that we have two Datasets we could eventually train a machine learning algorithm on the training Dataset and evaluate on the testing Dataset. However, one potential issue that may occur when splitting a Dataset (or DataFrame) into two “training” and “testing” subsets is that with categorical columns if the “training” set has categories (i.e. distinct values) that the “testing” set doesn’t have, because the data split into 70% & 30% subsets, this could result in poor performance of the machine learning model. Because a model could be trained on a set of data that doesn’t have certain categorical values in a categorical column in the training set that doesn’t show up in the testing set, or vice versa. So, many times it is worthwhile to remove rows of data that do not exist in the other set. 

Exercise 12: Dataset Manipulation

In this exercise we will remove categories from the training Dataset that are not present in the testing Dataset.

  1. Using the code that we outlined above, create a Dataset from the Adult DataFrame. Then create a training and testing Dataset with a 70%/30% split. Lastly, count the rows in the training_ds so we can compare at the end. 
case class Adult(age: Integer, workclass: String, education: String, marital_status: String, occupation: String, relationship: String, race: String, sex: String, hours_per_week: Double, native_country: String, class_label: String)

val adult_ds = adult_df.as[Adult]

val Array(training_ds, testing_ds) = adult_ds.randomSplit(Array(0.7, 0.3), seed = 775)

training_ds.count

Output:

res1: Long = 22870
  1. The training_ds and testing_ds Datasets have a column “native_country” that has more values in training_ds than testing_ds. To display the values that are different between two Datasets we can use the except() method to return the differences. So select the "native_country" column on training_ds and call the except() method passing the tresting_ds while also selecting the "native_country" column. Then display the results.
training_ds.select("native_country").except(testing_ds.select("native_country")).show()

Output:

+-------------------+
|     native_country|
+-------------------+
|           Honduras|
| Holand-Netherlands|
+-------------------+
  1. The results above show there are two countries in the training_ds not in the testing_ds. We need to make an array of the values in the DataFrame. We can do this by calling the collect() method and then using the map() method to convert it into an array of strings.
val excluded_countries = training_ds.select("native_country").except(testing_ds.select("native_country")).collect().map(x => x(0))
  1. Now we can use this array to filter out all the rows where the values in the “native_country” column are not in the array. To do that we will use the isin() method inside the not() method. The not() method negates everything inside of it. The isin() method only takes a special object called a varang. Which is a comma separated set of strings. The magic Scala command to convert an array to a varang is “:_*“. We can use the filter() method on the training_ds and save it as a new DataFrame name to exclude all the rows with a “native_country” equal to ” Honduras” and ” Holand-Netherlands”. 
val new_training_ds = training_ds.filter(not($"native_country".isin(excluded_countries:_*)))
  1. Count the rows of new_training_ds to verify that rows were removed
new_training_ds.count

Output:

res2: Long = 22856

This exercise shows a great example that all of the methods and operations that we learned on DataFrames work on Datasets as well.


Summary

This chapter built upon the foundation laid in Chapter 1DataFrames with Spark. We went from individual commands to writing machine learning pipelines. We introduced the reader to some advanced concepts in Spark that broaden their horizons in the realm of production systems, statistics, and machine learning. 

We started off by working through a data analysis problem from beginning to end. Next we covered how partitions work in DataFrames and how to write files to HDFS in a partitioned structure. Later we showed some powerful Spark operations for cleaning data and widened the scope of DataFrames from mere data to information that has meaning. Then we introduced statistics on DataFrames and how to construct machine learning pipelines. Lastly, we uncovered the realm of Datasets and how they help protect the data with defined types. 

In the next chapter, we are going to unveil the wide world of Spark SQL. You will learn how to use the Spark shell, write powerful SQL queries, and uncover the deep relationship between Spark SQL and Spark DataFrames. 


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.