This activity will combine the skills and techniques you learned so far in this chapter. Also, this activity will introduce brand new concepts not covered previously.
As an intern at XYZ BigData Analytics Firm you are progressing in your Spark skills and your first project was a big success. Now you are tasked with getting a dataset and a ML Pipeline ready for machine learning algorithms. Your assignment will have four parts:
- Cleaning up the dataset
- Splitting the data into training and testing sets
- Making a ML Pipeline that one-hot encodes all the DataFrame features
- Saving the final DataFrame to HDFS with partitions
Steps
- Getting data: in this activity we will be using the Adult data set that was used in section 2.5 Hypothesis Testing. Create a DataFramed named
adult_cat_df
from theadult_data.csv
. The CSV doesn’t have a header so we will have to manually add the column names using thetoDF()
method. ThetoDF()
method can be chained directly after theload()
method. Lastly, we have several columns that we know we are not going to use. So we can chain thedrop()
method after thetoDF()
method to remove columns in one command. - To increase the likelihood that the eventual machine learning algorithm will make good predictions, the DataFrame needs to be as clean as we can justifiably make it. To do that we need to trim whitespace from each cell, remove duplicated rows, and drop “unknown” values.
You probably didn’t notice when you downloaded theadult_data.csv
file from the website all the way back in section 2.5, but in the CSV file each cell of strings had a leading whitespace. Whitespace are extra space characters in data. And leading whitespaces are extra space characters at the beginning of data. These leading whitespaces are a result of how the CSV file was created by the originator of the data. In Step 1. we specified the delimiter as","
. Spark dutifully did so but since space is just another character, the spaces were incorporated into the data of each cell. This is a great example for us, because it shows the different types of random things that can happen to make data dirty.
This problem can easily be fixed in Spark by utilizing the functiontrim()
. The functiontrim()
works by removing all leading and trailing whitespaces from strings. Create a new DataFrame calledtrimmed_df
fromadult_cat_df
that replaces each string column with a new version that has the whitespaces removed.
Hint 1: This can be accomplished all in one operation by using a Spark method dealing with columns that we introduced in Chapter 1, DataFrames with Spark.
Hint 2: The imports are from pyspark.sql.functions import trim for Python and import org.apache.spark.sql.functions.trim for Spark Scala. - Drop duplicated rows from the
trimmed_df
DataFrame and save the new DataFrame as a varaible name calleddups_dropped_df
. - If you inspected the original
adult_cat_df
you might have noticed there are nonull
values. Instead, unknown values are represented by the question mark character, “?
“. These question marks are string characters and would be interpreted by any future machine algorithm as just another string category. We need to replace them with the actual Sparknull
value. Replace all “?
” with the Sparknull
value and save the resulting DataFrame asreplaced_questions_df
. - Now that our DataFrame has
null
values, we can decide what we want to do with them. We could leave them as is. But let’s say that we did some analysis on the eleven columns in our DataFrame and that rows with more than threenull
values can be dropped. Remove all rows with at least threenull
values in them and save the resulting DataFrame asclean_df
.
Hint: There are eleven columns in the DataFrame. Rows with “at least three”null
values means we want to keep rows with at least nine non-null values. - Split the data into training and testing DataFrames where 70% of the rows are placed in the “training” DataFrame and the other 30% are placed in the “testing” DataFrame. Set the seed to
535
.
Hint: the same operation we should for splitting Datasets will work for splitting DataFrames. - The data has been sufficiently cleaned and now we are ready to construct our ML Pipeline. Eventually, we want to create a “features” column that has columns from every column in the DataFrame except the label feature, “class”. So we will need to convert every feature column into indices representing the categories. But recently we heard about one-hot encoding from a coworker at XYZ BigData Analytics Firm. We did some research and learned that one-hot encoding is an algorithm that takes categorical features and transforms them into a form that helps machine learning algorithms perform better predictions. Categorical features as indexes don’t do well in machine learning algorithms because the algorithm interprets higher categorical indexes as better since the number is higher. For example, if we had a categorical feature “color” that had four possible values, red, green, blue, yellow, and this feature was mapped to an index that resulted in 1, 2, 3, 4 respectively. If this new color indexed feature was used in a machine learning algorithm the algorithm what interpret the number 4 which corresponds to yellow as better than 2 (which is green) because the number 4 is larger than the number 2. This would result in very poor machine learning predictions. The solution is to transform the indexed features into a sub-space where the distance between each value is the same. The solution is one-hot encoding. One-hot encoding takes number indexes and transforms them into a binary form that can be used in machine learning algorithms that preserver the true categorical features hierarchy. Research Spark’s OneHotEncoderEstimator documentation. After string indexing every feature column in the DataFrame, pass all the indexed feature column names to the OneHotEncoderEstimator and output new column names with the suffix “
_vec
“. Then combine all on-hot encoded variables using the vector assembler. Use thetraining
DataFrame on thepipeline.fit()
method save it as the variablemodel
. Use thetesting
DataFrame on themodel.transform()
methods and save it as a variabletransformed
. Lastly, display thetransformed
DataFrame columns (“age”, “age_index”, “age_vec”, “education”, “education_index”, “education_vec”, “class”, “class_index”, “class_vec”) so you can experience what all these column look like. (On every StringIndexer use the parameterhandleInvalid = "keep"
otherwise you will get an error when writing the DataFrame to HDFS in Step 8.) - Now the transformed DataFrame has all of our original columns, plus a “
features
” column as a vector that holds that data for all the columns, it is ready for machine learning algorithms. We want to save this DataFrame in HDFS so multiple data scientists can uses this data. Write the DataFrametransformed
to HDFS as parquet files, set the “overwrite” property, partition the files by the “sex” column and save all the files in a HDFS folder called “adult_ml_ready
“.
PySpark Solution
- Create DataFrame of categorical columns
adult_cat_df = spark.read.format("csv") \ .load("hdfs://…/adult/adult_data.csv" , sep = "," , inferSchema = "true" , header = "false") \ .toDF("age", "workclass", "fnlwgt", "education", "education-num", "marital-status", "occupation", "relationship", "race", "sex", "capital-gain", "capital-loss", "hours-per-week", "native-country", "class") \ .drop("fnlwgt", "education-num", "capital-gain", "capital-loss")
- Remove whitespaces from every cell in the DataFrame
from pyspark.sql.functions import trim trimmed_df = adult_cat_df \ .withColumn("workclass", trim(adult_cat_df["workclass"])) \ .withColumn("education", trim(adult_cat_df["education"])) \ .withColumn("marital-status", trim(adult_cat_df["marital-status"])) \ .withColumn("occupation", trim(adult_cat_df["occupation"])) \ .withColumn("relationship", trim(adult_cat_df["relationship"])) \ .withColumn("race", trim(adult_cat_df["race"])) \ .withColumn("sex", trim(adult_cat_df["sex"])) \ .withColumn("native-country", trim(adult_cat_df["native-country"])) \ .withColumn("class", trim(adult_cat_df["class"]))
- Drop duplicated rows
dups_dropped_df = trimmed_df.dropDuplicates()
- Replace every cell that is “
?
” with thenull
value
replaced_questions_df = dups_dropped_df.replace("?", None)
- Remove rows that have less than 9 non-null values
clean_df = replaced_questions_df.dropna(thresh = 9)
- Split the cleaned DataFrame into
training
andtesting
DataFrames
training, testing = clean_df.randomSplit([0.7, 0.3], seed = 535)
- Create a ML Pipeline with StringIndexers, OneHotEncoderEstimator, VectorAssembler along with the
training
andtesting
DataFrames
from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler from pyspark.ml.feature import StringIndexer from pyspark.ml.feature import OneHotEncoderEstimator age_indexer = StringIndexer( inputCol = "age" , outputCol = "age_index" , handleInvalid = "keep" ) workclass_indexer = StringIndexer( inputCol = "workclass" , outputCol = "workclass_index" , handleInvalid = "keep" ) education_indexer = StringIndexer( inputCol = "education" , outputCol = "education_index" , handleInvalid = "keep" ) marital_indexer = StringIndexer( inputCol = "marital-status" , outputCol = "marital-status_index" , handleInvalid = "keep" ) occupation_indexer = StringIndexer( inputCol = "occupation" , outputCol = "occupation_index" , handleInvalid = "keep" ) relationship_indexer = StringIndexer( inputCol = "relationship" , outputCol = "relationship_index" , handleInvalid = "keep" ) race_indexer = StringIndexer( inputCol = "race" , outputCol = "race_index" , handleInvalid = "keep" ) sex_indexer = StringIndexer( inputCol = "sex" , outputCol = "sex_index" , handleInvalid = "keep" ) hours_indexer = StringIndexer( inputCol = "hours-per-week" , outputCol = "hours-per-week_index" , handleInvalid = "keep" ) country_indexer = StringIndexer( inputCol = "native-country" , outputCol = "native-country_index" , handleInvalid = "keep" ) class_indexer = StringIndexer( inputCol = "class" , outputCol = "class_index" , handleInvalid = "keep" ) encoder = OneHotEncoderEstimator( inputCols = ["age_index", "workclass_index", "education_index", "marital-status_index", "occupation_index", "relationship_index", "race_index", "sex_index", "hours-per-week_index", "native-country_index", "class_index"] , outputCols = ["age_vec", "workclass_vec", "education_vec", "marital-status_vec", "occupation_vec", "relationship_vec", "race_vec", "sex_vec", "hours-per-week_vec", "native-country_vec", "class_vec"] ) assembler = VectorAssembler( inputCols = ["age_vec", "workclass_vec", "education_vec", "marital-status_vec", "occupation_vec", "relationship_vec", "race_vec", "sex_vec", "hours-per-week_vec", "native-country_vec"] , outputCol = "features" ) pipeline = Pipeline(stages = [age_indexer, workclass_indexer, education_indexer, marital_indexer, occupation_indexer, relationship_indexer, race_indexer, sex_indexer, hours_indexer, country_indexer, class_indexer, encoder, assembler]) model = pipeline.fit(training) transformed = model.transform(testing) transformed.select("age", "age_index", "age_vec", "education", "education_index", "education_vec", "class", "class_index", "class_vec").show(10, truncate=False)
Output
+---+---------+---------------+------------+---------------+---------------+-----+-----------+-------------+ |age|age_index|age_vec |education |education_index|education_vec |class|class_index|class_vec | +---+---------+---------------+------------+---------------+---------------+-----+-----------+-------------+ |19 |29.0 |(73,[29],[1.0])|HS-grad |0.0 |(16,[0],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |20 |26.0 |(73,[26],[1.0])|HS-grad |0.0 |(16,[0],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |23 |10.0 |(73,[10],[1.0])|9th |10.0 |(16,[10],[1.0])|<=50K|0.0 |(2,[0],[1.0])| |23 |10.0 |(73,[10],[1.0])|HS-grad |0.0 |(16,[0],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |23 |10.0 |(73,[10],[1.0])|Some-college|1.0 |(16,[1],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |24 |22.0 |(73,[22],[1.0])|HS-grad |0.0 |(16,[0],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |24 |22.0 |(73,[22],[1.0])|Bachelors |2.0 |(16,[2],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |25 |8.0 |(73,[8],[1.0]) |Bachelors |2.0 |(16,[2],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |27 |12.0 |(73,[12],[1.0])|11th |5.0 |(16,[5],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |29 |13.0 |(73,[13],[1.0])|7th-8th |8.0 |(16,[8],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| +---+---------+---------------+------------+---------------+---------------+-----+-----------+-------------+ only showing top 10 rows
- Write the final output DataFrame to HDFS as parquet files that are partitioned by the
"sex"
column
(transformed.write .format("parquet") .mode("overwrite") .partitionBy("sex") .save("hdfs://…/adult_ml_ready"))
Spark Scala Solution
- Create DataFrame of categorical columns
val adult_cat_df = spark.read.format("csv") .option("sep", ",") .option("inferSchema", "true") .option("header", "false") .load("hdfs://…/adult/adult_data.csv") .toDF("age", "workclass", "fnlwgt", "education", "education-num", "marital-status", "occupation", "relationship", "race", "sex", "capital-gain", "capital-loss", "hours-per-week", "native-country", "class") .drop("fnlwgt", "education-num", "capital-gain", "capital-loss")
- Remove whitespaces from every cell in the DataFrame
import org.apache.spark.sql.functions.trim val trimmed_df = adult_cat_df .withColumn("workclass", trim(adult_cat_df("workclass"))) .withColumn("education", trim(adult_cat_df("education"))) .withColumn("marital-status", trim(adult_cat_df("marital-status"))) .withColumn("occupation", trim(adult_cat_df("occupation"))) .withColumn("relationship", trim(adult_cat_df("relationship"))) .withColumn("race", trim(adult_cat_df("race"))) .withColumn("sex", trim(adult_cat_df("sex"))) .withColumn("native-country", trim(adult_cat_df("native-country"))) .withColumn("class", trim(adult_cat_df("class")))
- Drop duplicated rows
val dups_dropped_df = trimmed_df.dropDuplicates
- Replace every cell that is “
?
” with thenull
value
val replaced_questions_df = dups_dropped_df.na.replace("*", Map("?" -> null))
- Remove rows that have less than 9 non-null values
val clean_df = replaced_questions_df.na.drop(minNonNulls = 9)
- Split the cleaned DataFrame into
training
andtesting
DataFrames
val Array(training, testing) = clean_df.randomSplit(Array(0.7, 0.3), seed = 535)
- Create a ML Pipeline with StringIndexers, OneHotEncoderEstimator, VectorAssembler along with the
training
andtesting
DataFrames
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.feature.StringIndexer import org.apache.spark.ml.feature.OneHotEncoderEstimator val age_indexer = new StringIndexer() .setInputCol("age") .setOutputCol("age_index") .setHandleInvalid("keep") val workclass_indexer = new StringIndexer() .setInputCol("workclass") .setOutputCol("workclass_index") .setHandleInvalid("keep") val education_indexer = new StringIndexer() .setInputCol("education") .setOutputCol("education_index") val marital_indexer = new StringIndexer() .setInputCol("marital-status") .setOutputCol("marital-status_index") .setHandleInvalid("keep") val occupation_indexer = new StringIndexer() .setInputCol("occupation") .setOutputCol("occupation_index") .setHandleInvalid("keep") val relationship_indexer = new StringIndexer() .setInputCol("relationship") .setOutputCol("relationship_index") .setHandleInvalid("keep") val race_indexer = new StringIndexer() .setInputCol("race") .setOutputCol("race_index") .setHandleInvalid("keep") val sex_indexer = new StringIndexer() .setInputCol("sex") .setOutputCol("sex_index") .setHandleInvalid("keep") val hours_indexer = new StringIndexer() .setInputCol("hours-per-week") .setOutputCol("hours-per-week_index") .setHandleInvalid("keep") val country_indexer = new StringIndexer() .setInputCol("native-country") .setOutputCol("native-country_index") .setHandleInvalid("keep") val class_indexer = new StringIndexer() .setInputCol("class") .setOutputCol("class_index") .setHandleInvalid("keep") val encoder = new OneHotEncoderEstimator() .setInputCols(Array("age_index", "workclass_index", "education_index", "marital-status_index", "occupation_index", "relationship_index", "race_index", "sex_index", "hours-per-week_index", "native-country_index", "class_index")) .setOutputCols(Array("age_vec", "workclass_vec", "education_vec", "marital-status_vec", "occupation_vec", "relationship_vec", "race_vec", "sex_vec", "hours-per-week_vec", "native-country_vec", "class_vec")) val assembler = new VectorAssembler() .setInputCols(Array("age_vec", "workclass_vec", "education_vec", "marital-status_vec", "occupation_vec", "relationship_vec", "race_vec", "sex_vec", "hours-per-week_vec", "native-country_vec")) .setOutputCol("features") val pipeline = new Pipeline() .setStages(Array(age_indexer, workclass_indexer, education_indexer, marital_indexer, occupation_indexer, relationship_indexer, race_indexer, sex_indexer, hours_indexer, country_indexer, class_indexer, encoder, assembler)) val model = pipeline.fit(training) val transformed = model.transform(testing) transformed.select("age", "age_index", "age_vec", "education", "education_index", "education_vec", "class", "class_index", "class_vec").show(10, truncate=false)
Output
+---+---------+---------------+------------+---------------+---------------+-----+-----------+-------------+ |age|age_index|age_vec |education |education_index|education_vec |class|class_index|class_vec | +---+---------+---------------+------------+---------------+---------------+-----+-----------+-------------+ |19 |29.0 |(73,[29],[1.0])|HS-grad |0.0 |(15,[0],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |20 |26.0 |(73,[26],[1.0])|HS-grad |0.0 |(15,[0],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |23 |10.0 |(73,[10],[1.0])|9th |10.0 |(15,[10],[1.0])|<=50K|0.0 |(2,[0],[1.0])| |23 |10.0 |(73,[10],[1.0])|HS-grad |0.0 |(15,[0],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |23 |10.0 |(73,[10],[1.0])|Some-college|1.0 |(15,[1],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |24 |22.0 |(73,[22],[1.0])|HS-grad |0.0 |(15,[0],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |24 |22.0 |(73,[22],[1.0])|Bachelors |2.0 |(15,[2],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |25 |8.0 |(73,[8],[1.0]) |Bachelors |2.0 |(15,[2],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |27 |12.0 |(73,[12],[1.0])|11th |5.0 |(15,[5],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| |29 |13.0 |(73,[13],[1.0])|7th-8th |8.0 |(15,[8],[1.0]) |<=50K|0.0 |(2,[0],[1.0])| +---+---------+---------------+------------+---------------+---------------+-----+-----------+-------------+ only showing top 10 rows import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.feature.StringIndexer import org.apache.spark.ml.feature.OneHotEncoderEstimator age_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_ce85416a0455 workclass_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_be118b443e19 education_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_df938b4672a3 marital_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_1af9515f5f9f occupation_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_01999795db5f relationship_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_6b3df0212148 race_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_5dcce3d7bad7 sex_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_a6243870ce44 hours_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_42470fc0dc83 country_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_3fee12e95018 class_indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_885c3517e457 encoder: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEncoder_72dc93dd8048 assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_c5e2665cac69 pipeline: org.apache.spark.ml.Pipeline = pipeline_f18136bc5e3a model: org.apache.spark.ml.PipelineModel = pipeline_f18136bc5e3a transformed: org.apache.spark.sql.DataFrame = [age: int, workclass: string ... 32 more fields]
- Write the final output DataFrame to HDFS as parquet files that are partitioned by the
"sex"
column
transformed.write .format("parquet") .mode("overwrite") .partitionBy("sex") .save("hdfs://…/adult_ml_ready")