This activity will combine the skills and techniques you learned so far in this chapter. Also, this activity will introduce brand new concepts not covered previously.

As an intern at XYZ BigData Analytics Firm you are progressing in your Spark skills and your first project was a big success. Now you are tasked with getting a dataset and a ML Pipeline ready for machine learning algorithms. Your assignment will have four parts:

  1. Cleaning up the dataset 
  2. Splitting the data into training and testing sets
  3. Making a ML Pipeline that one-hot encodes all the DataFrame features
  4. Saving the final DataFrame to HDFS with partitions

Steps

  1. Getting data: in this activity we will be using the Adult data set that was used in section 2.5 Hypothesis Testing. Create a DataFramed named adult_cat_df from the adult_data.csv. The CSV doesn’t have a header so we will have to manually add the column names using the toDF() method. The toDF() method can be chained directly after the load() method. Lastly, we have several columns that we know we are not going to use. So we can chain the drop() method after the toDF() method to remove columns in one command. 
  2. To increase the likelihood that the eventual machine learning algorithm will make good predictions, the DataFrame needs to be as clean as we can justifiably make it. To do that we need to trim whitespace from each cell, remove duplicated rows, and drop “unknown” values. 
    You probably didn’t notice when you downloaded the adult_data.csv file from the website all the way back in section 2.5, but in the CSV file each cell of strings had a leading whitespace. Whitespace are extra space characters in data. And leading whitespaces are extra space characters at the beginning of data. These leading whitespaces are a result of how the CSV file was created by the originator of the data. In Step 1. we specified the delimiter as ",". Spark dutifully did so but since space is just another character, the spaces were incorporated into the data of each cell. This is a great example for us, because it shows the different types of random things that can happen to make data dirty. 
    This problem can easily be fixed in Spark by utilizing the function trim(). The function trim() works by removing all leading and trailing whitespaces from strings. Create a new DataFrame called trimmed_df from adult_cat_df that replaces each string column with a new version that has the whitespaces removed.
    Hint 1: This can be accomplished all in one operation by using a Spark method dealing with columns that we introduced in Chapter 1DataFrames with Spark.
    Hint 2: The imports are from pyspark.sql.functions import trim for Python and import org.apache.spark.sql.functions.trim for Spark Scala.
  3. Drop duplicated rows from the trimmed_df DataFrame and save the new DataFrame as a varaible name called dups_dropped_df
  4. If you inspected the original adult_cat_df you might have noticed there are no null values. Instead, unknown values are represented by the question mark character, “?“. These question marks are string characters and would be interpreted by any future machine algorithm as just another string category. We need to replace them with the actual Spark null value. Replace all “?” with the Spark null value and save the resulting DataFrame as replaced_questions_df.
  5. Now that our DataFrame has null values, we can decide what we want to do with them. We could leave them as is. But let’s say that we did some analysis on the eleven columns in our DataFrame and that rows with more than three null values can be dropped. Remove all rows with at least three null values in them and save the resulting DataFrame as clean_df.
    Hint: There are eleven columns in the DataFrame. Rows with “at least three” null values means we want to keep rows with at least nine non-null values.
  6. Split the data into training and testing DataFrames where 70% of the rows are placed in the “training” DataFrame and the other 30% are placed in the “testing” DataFrame. Set the seed to 535.
    Hint: the same operation we should for splitting Datasets will work for splitting DataFrames.
  7. The data has been sufficiently cleaned and now we are ready to construct our ML Pipeline. Eventually, we want to create a “features” column that has columns from every column in the DataFrame except the label feature, “class”. So we will need to convert every feature column into indices representing the categories. But recently we heard about one-hot encoding from a coworker at XYZ BigData Analytics Firm. We did some research and learned that one-hot encoding is an algorithm that takes categorical features and transforms them into a form that helps machine learning algorithms perform better predictions. Categorical features as indexes don’t do well in machine learning algorithms because the algorithm interprets higher categorical indexes as better since the number is higher. For example, if we had a categorical feature “color” that had four possible values, red, green, blue, yellow, and this feature was mapped to an index that resulted in 1, 2, 3, 4 respectively. If this new color indexed feature was used in a machine learning algorithm the algorithm what interpret the number 4 which corresponds to yellow as better than 2 (which is green) because the number 4 is larger than the number 2. This would result in very poor machine learning predictions. The solution is to transform the indexed features into a sub-space where the distance between each value is the same. The solution is one-hot encoding. One-hot encoding takes number indexes and transforms them into a binary form that can be used in machine learning algorithms that preserver the true categorical features hierarchy. Research Spark’s OneHotEncoderEstimator documentation. After string indexing every feature column in the DataFrame, pass all the indexed feature column names to the OneHotEncoderEstimator and output new column names with the suffix “_vec“. Then combine all on-hot encoded variables using the vector assembler. Use the training DataFrame on the pipeline.fit() method save it as the variable model. Use the testing DataFrame on the model.transform() methods and save it as a variable transformed. Lastly, display the transformed DataFrame columns (“age”, “age_index”, “age_vec”, “education”, “education_index”, “education_vec”, “class”, “class_index”, “class_vec”) so you can experience what all these column look like. (On every StringIndexer use the parameter handleInvalid = "keep" otherwise you will get an error when writing the DataFrame to HDFS in Step 8.)
  8. Now the transformed DataFrame has all of our original columns, plus a “features” column as a vector that holds that data for all the columns, it is ready for machine learning algorithms. We want to save this DataFrame in HDFS so multiple data scientists can uses this data. Write the DataFrame transformed to HDFS as parquet files, set the “overwrite” property, partition the files by the “sex” column and save all the files in a HDFS folder called “adult_ml_ready“.

PySpark Solution

  1. Create DataFrame of categorical columns
adult_cat_df = spark.read.format("csv") \
  .load("hdfs://…/adult/adult_data.csv"
       , sep = ","
       , inferSchema = "true"
       , header = "false") \
  .toDF("age", "workclass", "fnlwgt", "education", "education-num", "marital-status", "occupation", "relationship", "race", "sex", "capital-gain", "capital-loss", "hours-per-week", "native-country", "class") \
  .drop("fnlwgt", "education-num", "capital-gain", "capital-loss")
  1. Remove whitespaces from every cell in the DataFrame
from pyspark.sql.functions import trim

trimmed_df = adult_cat_df \
  .withColumn("workclass", trim(adult_cat_df["workclass"])) \
  .withColumn("education", trim(adult_cat_df["education"])) \
  .withColumn("marital-status", trim(adult_cat_df["marital-status"])) \
  .withColumn("occupation", trim(adult_cat_df["occupation"])) \
  .withColumn("relationship", trim(adult_cat_df["relationship"])) \
  .withColumn("race", trim(adult_cat_df["race"])) \
  .withColumn("sex", trim(adult_cat_df["sex"])) \
  .withColumn("native-country", trim(adult_cat_df["native-country"])) \
  .withColumn("class", trim(adult_cat_df["class"]))
  1. Drop duplicated rows
dups_dropped_df = trimmed_df.dropDuplicates()
  1. Replace every cell that is “?” with the null value
replaced_questions_df = dups_dropped_df.replace("?", None)
  1. Remove rows that have less than 9 non-null values
clean_df = replaced_questions_df.dropna(thresh = 9)
  1. Split the cleaned DataFrame into training and testing DataFrames
training, testing = clean_df.randomSplit([0.7, 0.3], seed = 535)
  1. Create a ML Pipeline with StringIndexers, OneHotEncoderEstimator, VectorAssembler along with the training and testing DataFrames
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator

age_indexer = StringIndexer(
  inputCol = "age"
  , outputCol = "age_index"
  , handleInvalid = "keep"
)

workclass_indexer = StringIndexer(
  inputCol = "workclass"
  , outputCol = "workclass_index"
  , handleInvalid = "keep"
)

education_indexer = StringIndexer(
  inputCol = "education"
  , outputCol = "education_index"
  , handleInvalid = "keep"
)

marital_indexer = StringIndexer(
  inputCol = "marital-status"
  , outputCol = "marital-status_index"
  , handleInvalid = "keep"
)

occupation_indexer = StringIndexer(
  inputCol = "occupation"
  , outputCol = "occupation_index"
  , handleInvalid = "keep"
)

relationship_indexer = StringIndexer(
  inputCol = "relationship"
  , outputCol = "relationship_index"
  , handleInvalid = "keep"
)

race_indexer = StringIndexer(
  inputCol = "race"
  , outputCol = "race_index"
  , handleInvalid = "keep"
)

sex_indexer = StringIndexer(
  inputCol = "sex"
  , outputCol = "sex_index"
  , handleInvalid = "keep"
)

hours_indexer = StringIndexer(
  inputCol = "hours-per-week"
  , outputCol = "hours-per-week_index"
  , handleInvalid = "keep"
)

country_indexer = StringIndexer(
  inputCol = "native-country"
  , outputCol = "native-country_index"
  , handleInvalid = "keep"
)

class_indexer = StringIndexer(
  inputCol = "class"
  , outputCol = "class_index"
  , handleInvalid = "keep"
)

encoder = OneHotEncoderEstimator(
  inputCols = ["age_index", "workclass_index", "education_index", "marital-status_index", "occupation_index", "relationship_index", "race_index", "sex_index", "hours-per-week_index", "native-country_index", "class_index"]
  , outputCols = ["age_vec", "workclass_vec", "education_vec", "marital-status_vec", "occupation_vec", "relationship_vec", "race_vec", "sex_vec", "hours-per-week_vec", "native-country_vec", "class_vec"]
)

assembler = VectorAssembler(
  inputCols = ["age_vec", "workclass_vec", "education_vec", "marital-status_vec", "occupation_vec", "relationship_vec", "race_vec", "sex_vec", "hours-per-week_vec", "native-country_vec"]
  , outputCol = "features"
)

pipeline = Pipeline(stages = [age_indexer, workclass_indexer, education_indexer, marital_indexer, occupation_indexer, relationship_indexer, race_indexer, sex_indexer, hours_indexer, country_indexer, class_indexer, encoder, assembler])

model = pipeline.fit(training)

transformed = model.transform(testing)

transformed.select("age", "age_index", "age_vec", "education", "education_index", "education_vec", "class", "class_index", "class_vec").show(10, truncate=False)

Output

+---+---------+---------------+------------+---------------+---------------+-----+-----------+-------------+
|age|age_index|age_vec        |education   |education_index|education_vec  |class|class_index|class_vec    |
+---+---------+---------------+------------+---------------+---------------+-----+-----------+-------------+
|19 |29.0     |(73,[29],[1.0])|HS-grad     |0.0            |(16,[0],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|20 |26.0     |(73,[26],[1.0])|HS-grad     |0.0            |(16,[0],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|23 |10.0     |(73,[10],[1.0])|9th         |10.0           |(16,[10],[1.0])|<=50K|0.0        |(2,[0],[1.0])|
|23 |10.0     |(73,[10],[1.0])|HS-grad     |0.0            |(16,[0],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|23 |10.0     |(73,[10],[1.0])|Some-college|1.0            |(16,[1],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|24 |22.0     |(73,[22],[1.0])|HS-grad     |0.0            |(16,[0],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|24 |22.0     |(73,[22],[1.0])|Bachelors   |2.0            |(16,[2],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|25 |8.0      |(73,[8],[1.0]) |Bachelors   |2.0            |(16,[2],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|27 |12.0     |(73,[12],[1.0])|11th        |5.0            |(16,[5],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|29 |13.0     |(73,[13],[1.0])|7th-8th     |8.0            |(16,[8],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
+---+---------+---------------+------------+---------------+---------------+-----+-----------+-------------+
only showing top 10 rows
  1. Write the final output DataFrame to HDFS as parquet files that are partitioned by the "sex" column
(transformed.write
  .format("parquet")
  .mode("overwrite")
  .partitionBy("sex")
  .save("hdfs://…/adult_ml_ready"))

Spark Scala Solution

  1. Create DataFrame of categorical columns
val adult_cat_df = spark.read.format("csv")
  .option("sep", ",")
  .option("inferSchema", "true")
  .option("header", "false")
  .load("hdfs://…/adult/adult_data.csv")
  .toDF("age", "workclass", "fnlwgt", "education", "education-num", "marital-status", "occupation", "relationship", "race", "sex", "capital-gain", "capital-loss", "hours-per-week", "native-country", "class")
  .drop("fnlwgt", "education-num", "capital-gain", "capital-loss")
  1. Remove whitespaces from every cell in the DataFrame
import org.apache.spark.sql.functions.trim

val trimmed_df = adult_cat_df
  .withColumn("workclass", trim(adult_cat_df("workclass")))
  .withColumn("education", trim(adult_cat_df("education")))
  .withColumn("marital-status", trim(adult_cat_df("marital-status")))
  .withColumn("occupation", trim(adult_cat_df("occupation")))
  .withColumn("relationship", trim(adult_cat_df("relationship")))
  .withColumn("race", trim(adult_cat_df("race")))
  .withColumn("sex", trim(adult_cat_df("sex")))
  .withColumn("native-country", trim(adult_cat_df("native-country")))
  .withColumn("class", trim(adult_cat_df("class")))
  1. Drop duplicated rows
val dups_dropped_df = trimmed_df.dropDuplicates
  1. Replace every cell that is “?” with the null value
val replaced_questions_df = dups_dropped_df.na.replace("*", Map("?" -> null))
  1. Remove rows that have less than 9 non-null values
val clean_df = replaced_questions_df.na.drop(minNonNulls = 9)
  1. Split the cleaned DataFrame into training and testing DataFrames
val Array(training, testing) = clean_df.randomSplit(Array(0.7, 0.3), seed = 535)
  1. Create a ML Pipeline with StringIndexers, OneHotEncoderEstimator, VectorAssembler along with the training and testing DataFrames
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.OneHotEncoderEstimator

val age_indexer = new StringIndexer()
  .setInputCol("age")
  .setOutputCol("age_index")
  .setHandleInvalid("keep")

val workclass_indexer = new StringIndexer()
  .setInputCol("workclass")
  .setOutputCol("workclass_index")
  .setHandleInvalid("keep")

val education_indexer = new StringIndexer()
  .setInputCol("education")
  .setOutputCol("education_index")

val marital_indexer = new StringIndexer()
  .setInputCol("marital-status")
  .setOutputCol("marital-status_index")
  .setHandleInvalid("keep")

val occupation_indexer = new StringIndexer()
  .setInputCol("occupation")
  .setOutputCol("occupation_index")
  .setHandleInvalid("keep")

val relationship_indexer = new StringIndexer()
  .setInputCol("relationship")
  .setOutputCol("relationship_index")
  .setHandleInvalid("keep")

val race_indexer = new StringIndexer()
  .setInputCol("race")
  .setOutputCol("race_index")
  .setHandleInvalid("keep")

val sex_indexer = new StringIndexer()
  .setInputCol("sex")
  .setOutputCol("sex_index")
  .setHandleInvalid("keep")

val hours_indexer = new StringIndexer()
  .setInputCol("hours-per-week")
  .setOutputCol("hours-per-week_index")
  .setHandleInvalid("keep")

val country_indexer = new StringIndexer()
  .setInputCol("native-country")
  .setOutputCol("native-country_index")
  .setHandleInvalid("keep")

val class_indexer = new StringIndexer()
  .setInputCol("class")
  .setOutputCol("class_index")
  .setHandleInvalid("keep")

val encoder = new OneHotEncoderEstimator()
  .setInputCols(Array("age_index", "workclass_index", "education_index", "marital-status_index", "occupation_index", "relationship_index", "race_index", "sex_index", "hours-per-week_index", "native-country_index", "class_index"))
  .setOutputCols(Array("age_vec", "workclass_vec", "education_vec", "marital-status_vec", "occupation_vec", "relationship_vec", "race_vec", "sex_vec", "hours-per-week_vec", "native-country_vec", "class_vec"))

val assembler = new VectorAssembler()
  .setInputCols(Array("age_vec", "workclass_vec", "education_vec", "marital-status_vec", "occupation_vec", "relationship_vec", "race_vec", "sex_vec", "hours-per-week_vec", "native-country_vec"))
  .setOutputCol("features")

val pipeline = new Pipeline()
  .setStages(Array(age_indexer, workclass_indexer, education_indexer, marital_indexer, occupation_indexer, relationship_indexer, race_indexer, sex_indexer, hours_indexer, country_indexer, class_indexer, encoder, assembler))

val model = pipeline.fit(training)

val transformed = model.transform(testing)

transformed.select("age", "age_index", "age_vec", "education", "education_index", "education_vec", "class", "class_index", "class_vec").show(10, truncate=false)

Output

+---+---------+---------------+------------+---------------+---------------+-----+-----------+-------------+
|age|age_index|age_vec        |education   |education_index|education_vec  |class|class_index|class_vec    |
+---+---------+---------------+------------+---------------+---------------+-----+-----------+-------------+
|19 |29.0     |(73,[29],[1.0])|HS-grad     |0.0            |(15,[0],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|20 |26.0     |(73,[26],[1.0])|HS-grad     |0.0            |(15,[0],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|23 |10.0     |(73,[10],[1.0])|9th         |10.0           |(15,[10],[1.0])|<=50K|0.0        |(2,[0],[1.0])|
|23 |10.0     |(73,[10],[1.0])|HS-grad     |0.0            |(15,[0],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|23 |10.0     |(73,[10],[1.0])|Some-college|1.0            |(15,[1],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|24 |22.0     |(73,[22],[1.0])|HS-grad     |0.0            |(15,[0],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|24 |22.0     |(73,[22],[1.0])|Bachelors   |2.0            |(15,[2],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|25 |8.0      |(73,[8],[1.0]) |Bachelors   |2.0            |(15,[2],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|27 |12.0     |(73,[12],[1.0])|11th        |5.0            |(15,[5],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
|29 |13.0     |(73,[13],[1.0])|7th-8th     |8.0            |(15,[8],[1.0]) |<=50K|0.0        |(2,[0],[1.0])|
+---+---------+---------------+------------+---------------+---------------+-----+-----------+-------------+
only showing top 10 rows

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.OneHotEncoderEstimator
age_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_ce85416a0455
workclass_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_be118b443e19
education_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_df938b4672a3
marital_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_1af9515f5f9f
occupation_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_01999795db5f
relationship_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_6b3df0212148
race_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_5dcce3d7bad7
sex_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_a6243870ce44
hours_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_42470fc0dc83
country_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_3fee12e95018
class_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_885c3517e457
encoder: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEncoder_72dc93dd8048
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_c5e2665cac69
pipeline: org.apache.spark.ml.Pipeline = pipeline_f18136bc5e3a
model: org.apache.spark.ml.PipelineModel = pipeline_f18136bc5e3a
transformed: org.apache.spark.sql.DataFrame = [age: int, workclass: string ... 32 more fields]
  1. Write the final output DataFrame to HDFS as parquet files that are partitioned by the "sex" column
transformed.write
  .format("parquet")
  .mode("overwrite")
  .partitionBy("sex")
  .save("hdfs://…/adult_ml_ready")

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.