Introduction

In this section, we are going to cover the Spark Pearson’s Chi-squared ( χ2) statistic . We will also introduce Spark’s ML Pipelines and a new transformer: the StringIndexer. 

The purpose of data, in general, is to use the data to help make effective decisions. But how do we know with any certainty that our analysis will lead to better decisions? We rarely, if ever, have all the desired data about a business problem, academic problem, or any problem for that matter. Therefore, it is impossible to know with absolute certainty whether our analysis is correct. This is where the science of statistics tries to provide insight into things that are unknowable. Statistics deals with samples of data that represent the entire population of data and tries to make reasonable assertions with sample data.

The purpose of data is to use the data to help make effective decisions

Hypothesis testing is a statistical test on two samples of data to determine if we can accept or reject the null hypothesis. To do this, hypothesis testing, tests whether the result occurred by chance using statistical operations. There are only two possible outcomes: a statistically significant result or a change that can only be attributed to chance. Hypothesis testing works using two statistical hypothesis’: the null and alternative hypothesis’. The null hypothesis is the default hypothesis and means nothing has changed or the result is purely from chance. The other option is the alternative hypothesis and means some other cause is impacting the results. For example, if we wanted to evaluate whether a coin is a fair coin then the null hypothesis would be that half the flips land heads and the other half land tails. The alternative hypothesis would be that the ratio of heads and tails does not result in fifty percent split. 

Statistics deals with samples of data that represent the entire population of data and tries to make reasonable assertions with sample data

The Pearson’s Chi-squared test is a type of hypothesis testing that tests feature columns against the label column (outcome column) for independence. Every feature column in the test is paired with the label column and the Chi-squared statistic is computed. With the Chi-squared statistic, every column, both features and label, must be a categorical column. Which means it cannot use continuous columns. Continuous columns represent number data that can change continuously or the disticnt values cannot be counted because they are infinite. An example would be a column that represented by numbers: weight, revenue, sales in dollars, and so on. 

Getting Data

The data set for this section is the Adult data set from data.world. Citation: Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science. To access the data on data.world requires a free account. After signing up, download the file “adult.data.csv” (once downloaded change the file name to “adult_data.csv“). This data set consists of categorical columns of various personal characteristics with the aim of trying to predict whether adults make more than $50,000 per year. This data set of categorical columns will be a good fit for Hypothesis Testing. Upload the CSV to your Hadoop cluster and save it in HDFS. Once in HDFS make a DataFrame called adult_df from the CSV file. This CSV doesn’t have a header so be sure to set header equal to false. We can specify the columns with the toDF() method. The final code will look like:

PySpark:

adult_df = spark.read.format("csv").load("hdfs://…/adult/adult_data.csv"
                 , sep = ","
                 , inferSchema = "true"
                 , header = "false").toDF("age", "workclass", "fnlwgt", "education", "education-num", "marital-status", "occupation", "relationship", "race", "sex", "capital-gain", "capital-loss", "hours-per-week", "native-country", "class")

Spark Scala:

val adult_df = spark.read.format("csv")
  .option("sep", ",")
  .option("inferSchema", "true")
  .option("header", "false")
  .load("hdfs://…/adult/adult_data.csv").toDF("age", "workclass", "fnlwgt", "education", "education-num", "marital-status", "occupation", "relationship", "race", "sex", "capital-gain", "capital-loss", "hours-per-week", "native-country", "class")

This DataFrame has several columns we do not want. We ultimately want a DataFrame of only categorical columns. So we will drop the four columns we do not want and call the DataFrame “adult_cat_df“.

PySpark:

adult_cat_df = adult_df.drop("fnlwgt", "education-num", "capital-gain", "capital-loss")

Spark Scala:

val adult_cat_df = adult_df.drop("fnlwgt", "education-num", "capital-gain", "capital-loss")

StringIndexer

Since we are dealing only with categorical columns we need a new transformer for our ML pipeline: the StringIndexer. The purpose of the StringIndexer is to convert categorical columns (either strings or numbers) into indices by numbering every unique entry starting from zero up to the total number of unique entries in the column. In Spark the StringIndexer can only be used on one column at a time. So, to use the StringIndexer on multiple columns, which we will see in the exercise below, we simply make more StringIndexers. But the ML Pipeline makes packaging them together very easy. The StringIndexer works by creating a new column in the DataFrame. 

Start by calling the method StringIndexer with two parameters. The first parameter is the input column and is the categorical column you would like converted into an index. The second parameter is the output column and is the new column name you will be creating. Normally, the output column is the input column name concatenated with “_index”, but you may name the new string index column however you like. A PySpark StringIndexer would look like:

from pyspark.ml.feature import StringIndexer

category_indexer = StringIndexer(
  inputCol = "category"
  , outputCol = "category_index"
)

A Spark Scala StringIndexer example would be:

import org.apache.spark.ml.feature.StringIndexer

val category _indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("category_index")

ML Pipelines

The purpose of Spark’s ML Pipeline API is to make it very easy to combine or chain algorithms together into one workflow. All the elements of a ML Pipeline are either transformers or estimators. A transformer usually transforms one DataFrame into another by adding one or more columns to the original DataFrame. A transformer also implements the method transform(). An estimator is an algorithm that trains on data and implements the method fit(). An estimator is used on a DataFrame and outputs a model, which is itself a transformer. 

To use Spark’s ML Pipelines use the imports: from pyspark.ml import Pipeline for PySpark and import org.apache.spark.ml.Pipeline for Scala. Proceed to use any indexers, vector assemblers, or any machine learning transformers and estimators. Once you have all the pieces you can assemble them in a pipeline. Call the Pipeline function which will have “stages” which are the variable names for the individual transformers and estimators in the pipeline. The stages are called in order from left to right in the pipeline. So make sure your stages follow a natural progression of transforming and processing the DataFrame. An hypothetical example using a DataFrame named  “df” with made up columns “category“, “sales_range“, and “prediction” would look like the following:

PySpark:

from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [category_indexer, sales_range_indexer, prediction_indexer, assembler])

Spark Scala:

import org.apache.spark.ml.Pipeline

val pipeline = new Pipeline()
  .setStages(Array(category_indexer, sales_range_indexer, prediction_indexer, assembler))

Next we create a “model” by calling the fit() method on our pipeline and passing a DataFrame. In the example below the variable “model” is a PipelineModel which accepts new data and applies the data to the stages in our pipeline. 

PySpark

model = pipeline.fit(df)

Spark Scala:

val model = pipeline.fit(df)

Lastly, we transform the newly created model and produce our final DataFrame that has the output or prediction column. In our hypothetical example it would look like:

PySpark

transformed = model.transform(df)

Spark Scala:

val transformed = model.transform(df)

Let’s go through a detailed exercise and see Spark’s ML Pipeline in action.

Exercise 11: Spark’s Pearson’s Chi-squared test statistic

In this exercise we will use ML Pipelines to calculate Pearson’s Chi-squared test statistic.

  1. Import the Python and Scala imports needed for the ML Pipelines and the Chi-squared statistic

PySpark

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.stat import ChiSquareTest

Spark Scala:

import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.stat.ChiSquareTest
  1. Create StringIndexers for the two feature columns, "age" & "education", and create a StringIndexer for the label column "class". These three columns will be converted to numerical columns so they can be used by the ML Pipeline.

PySpark

age_indexer = StringIndexer(
  inputCol = "age"
  , outputCol = "age_index"
)

education_indexer = StringIndexer(
  inputCol = "education"
  , outputCol = "education_index"
)

class_indexer = StringIndexer(
  inputCol = "class"
  , outputCol = "class_index"
)

Spark Scala:

val age_indexer = new StringIndexer()
  .setInputCol("age")
  .setOutputCol("age_index")

val education_indexer = new StringIndexer()
  .setInputCol("education")
  .setOutputCol("education_index")

val class_indexer = new StringIndexer()
  .setInputCol("class")
  .setOutputCol("class_index")

These three columns are converted to indexes because the ML Pipeline and the Chi-squared test only work on numbers. Since these columns are strings we have transformed them into numbers.

  1. Use the VectorAssembler on the "age_index" and the "education_index" numerical columns to create a new column of vectors called "features".

PySpark

assembler = VectorAssembler(
  inputCols = ["age_index", "education_index"]
  , outputCol = "features"
)

Spark Scala:

val assembler = new VectorAssembler()
  .setInputCols(Array("age_index", "education_index"))
  .setOutputCol("features")
  1. Construct a ML Pipeline called “pipeline” with stages of the three indexers and the assembler.

PySpark

pipeline = Pipeline(stages = [education_indexer, age_indexer, class_indexer, assembler])

Spark Scala:

val pipeline = new Pipeline()
  .setStages(Array(education_indexer, age_indexer, class_indexer, assembler))
  1. Use the pipeline to apply a model to the “adult_cat_df” DataFrame that returns a new DataFrame that we call “model“. Then apply that model to the “adult_cat_df” DataFrame to transform the DataFrame with the four new columns: “age_index“, “education_index“, “class_index“, and “features“. Name the final DataFrame “transformed“.

PySpark

model = pipeline.fit(adult_cat_df)

transformed = model.transform(adult_cat_df)

Spark Scala:

val model = pipeline.fit(adult_cat_df)

val transformed = model.transform(adult_cat_df)
  1. We are now ready to perform our Chi-squared test. Call the ChiSquareTest.test() with  "transformed" as the first parameter, the second parameter "features" which is a column of combined vectors, and the third parameter "class_index" which is the name of the column label we are testing against. Display the final DataFrame.

PySpark

chi_test = ChiSquareTest.test(transformed, "features", "class_index")

chi_test.show(truncate=False)

Spark Scala:

val chi_test = ChiSquareTest.test(transformed, "features", "class_index")

chi_test.show(truncate=false)

Output:

+---------+----------------+--------------------------------------+
|pValues  |degreesOfFreedom|statistics                            |
+---------+----------------+--------------------------------------+
|[0.0,0.0]|[72, 15]        |[3502.0364642305994,4429.653302288618]|
+---------+----------------+--------------------------------------+
  1. Lastly, we can extract the columns pValuesdegreesOfFreedom, and the statistics from the DataFrame and print the data in a nicer fashion. Follow the PySpark and Spark Scala code below:

PySpark

chi = chi_test.head()
print("pValues: " + str(chi.pValues))
print("degreesOfFreedom: " + str(chi.degreesOfFreedom))
print("statistics: " + str(chi.statistics))

Spark Scala:

val chi = chi_test.head
println(s"pValues: ${chi.getAs[Vector](0)}")
println(s"degreesOfFreedom: ${chi.getSeq[Int](1).mkString("[", ",", "]")}")
println(s"statistics: ${chi.getAs[Vector](2)}")

Output:

pValues: [0.0,0.0]
degreesOfFreedom: [72,15]
statistics: [3502.0364642305994,4429.653302288618]

The full code example is below.

PySpark

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.stat import ChiSquareTest

age_indexer = StringIndexer(
  inputCol = "age"
  , outputCol = "age_index"
)

education_indexer = StringIndexer(
  inputCol = "education"
  , outputCol = "education_index"
)

class_indexer = StringIndexer(
  inputCol = "class"
  , outputCol = "class_index"
)

assembler = VectorAssembler(
  inputCols = ["age_index", "education_index"]
  , outputCol = "features"
)

pipeline = Pipeline(stages = [education_indexer, age_indexer, class_indexer, assembler])

model = pipeline.fit(adult_cat_df)

transformed = model.transform(adult_cat_df)

chi_test = ChiSquareTest.test(transformed, "features", "class_index")

chi_test.show(truncate=False)

chi = chi_test.head()
print("pValues: " + str(chi.pValues))
print("degreesOfFreedom: " + str(chi.degreesOfFreedom))
print("statistics: " + str(chi.statistics))

Spark Scala:

import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.stat.ChiSquareTest

val age_indexer = new StringIndexer()
  .setInputCol("age")
  .setOutputCol("age_index")

val education_indexer = new StringIndexer()
  .setInputCol("education")
  .setOutputCol("education_index")

val class_indexer = new StringIndexer()
  .setInputCol("class")
  .setOutputCol("class_index")

val assembler = new VectorAssembler()
  .setInputCols(Array("age_index", "education_index"))
  .setOutputCol("features")

val pipeline = new Pipeline()
  .setStages(Array(education_indexer, age_indexer, class_indexer, assembler))

val model = pipeline.fit(adult_cat_df)

val transformed = model.transform(adult_cat_df)

val chi_test = ChiSquareTest.test(transformed, "features", "class_index")

chi_test.show(truncate=false)

val chi = chi_test.head
println(s"pValues: ${chi.getAs[Vector](0)}")
println(s"degreesOfFreedom: ${chi.getSeq[Int](1).mkString("[", ",", "]")}")
println(s"statistics: ${chi.getAs[Vector](2)}")
+---------+----------------+--------------------------------------+
|pValues  |degreesOfFreedom|statistics                            |
+---------+----------------+--------------------------------------+
|[0.0,0.0]|[72, 15]        |[3502.0364642305994,4429.653302288618]|
+---------+----------------+--------------------------------------+

pValues: [0.0,0.0]
degreesOfFreedom: [72, 15]
statistics: [3502.0364642305994,4429.653302288618]

Spark ML Pipelines make transforming data and applying machine learning algorithms very easy. We didn’t use any machine learning algorithms in this pipeline but we could have. A machine learning algorithm is simply an estimator that would get added right into the pipeline. 


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.