Introduction

At the end of the section, 2.3 DataFrame Cleaning, we stated that the objective of any data set is to be used to help us make decisions. Furthering that theme is the realm of statistics. At its base form, statistics is a science that uses mathematical analysis to draw conclusions about data. Some examples from statistics include sample mean, sample variance, sample quantiles, and test statistics to name a few. In this section we will cover the following built-in Spark statistical functions using DataFrames: Summarizer, Correlation, and Hypothesis Testing. However, this section does not intend to teach statistics or even be an introduction of statistics. Instead, this section will focus on using these built-in Spark statistical operations and introduce the concept of ML Pipelines that are used in creating machine learning pipelines.

Statistics is a science that uses mathematical analysis to draw conclusions about data

Getting Data

This section will use two different data sources to complete the exercises. The first data set is the “US Employment and Unemployment rates since 1940” and can be accessed via the DataHub page. The data set is a small data set that has population and labor numbers by year for the United States. It will be a good data set when teaching the VectorAssembler and the Summarizer. To get the data, click the big black “Download” button which will download a CSV file. Upload the CSV to your Hadoop cluster and save it in HDFS. Once in HDFS make a DataFrame called employment_df from the CSV file like the code below:

PySpark:

employment_df = spark.read.format("csv").load("hdfs://…/us_employment/aat1.csv"
                     , sep = ","
                     , inferSchema = "true"
                     , header = "true")

Spark Scala:

val employment_df = spark.read.format("csv")
  .option("sep", ",")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("hdfs://…/us_employment/aat1.csv")

The second data set we will be using in this section is an Australian electric company open data set. Navigate to the OpenML site and download the CSV file by clicking on the CSV icon in the top right corner of the screen. We will use this data set during our discussion of Correlation. Upload the CSV to your Hadoop cluster and save it in HDFS. Once in HDFS make a DataFrame called electricity_df from the CSV file like the code below:

PySpark:

electricity_df = spark.read.format("csv").load("hdfs://…/electric/electricity-normalized.csv"
                     , sep = ","
                     , inferSchema = "true"
                     , header = "true")

Spark Scala:

val electricity_df = spark.read.format("csv")
  .option("sep", ",")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("hdfs://…/electric/electricity-normalized.csv")

DataFrames and Vectors

In linear algebra a vector is an array of numbers where each value is called a scaler. Vectors are critical for machine learning because machine learning algorithms use vectors as input data. In Spark, a vector is called a DenseVector. The values of DenseVectors always have a data type of double and are a part of the class ml.linalg. In PySpark, creating DenseVectors looks like:

from pyspark.ml.linalg import Vectors

my_vector = Vectors.dense([4.0, 5.0, 0.0, 3.0])
print(my_vector)

Output:

[4.0,5.0,0.0,3.0]

In Spark Scala, creating DenseVectors looks like:

import org.apache.spark.ml.linalg.Vectors

val my_vector = Vectors.dense(4.0, 5.0, 0.0, 3.0)
println(my_vector)

Output:

[4.0,5.0,0.0,3.0]
import org.apache.spark.ml.linalg.Vectors
my_vector: org.apache.spark.ml.linalg.Vector = [4.0,5.0,0.0,3.0]

In linear algebra a vector is an array of numbers where each value is called a scaler

In machine learning a feature vector is a “row” of data where each scaler corresponds to the columns (called “features”) of the data set. So in Spark we will convert a DataFrame into DenseVectors of decimal numbers that then can be fed to a machine learning algorithm. 

In order to transform a DataFrame of data into a DenseVector of decimal scalers, Spark uses a concept called ML Pipelines to create machine learning pipelines. ML Pipelines are a series of high-level APIs that use data from a DataFrame and perform operations on that data called transformers or estimators . The journey from raw DataFrame to machine learning prediction requires many steps and operations. The goal of Spark’s ML Pipelines is to standardize these steps into logical phases that are easy to implement. A transformer is an algorithm that inputs a DataFrame and outputs another DataFrame with the data transformed. An estimator is an algorithm that is applied to a DataFrame to produce another transformer. Machine learning algorithms like regression or random forests are estimators because it trains on a DataFrame and produces a model. This section will cover several essential transformers using the ML Pipelines framework.

VectorAssembler

The principal transformer we will be using is the VectorAssembler. The VectorAssembler assembles a column or columns of data from a DataFrame and produces a new column of vectors of the data from the columns. It is ideal for combining columns, which are called features, into a single column of vectors that can be used in machine learning algorithms. The VectorAssembler only accepts columns with data types of integer, long, double, boolean, and vector. Dates, timestamps, and string data types cannot be assembled into vectors because vectors can only be decimal numbers. Later in the section we will show how we can overcome this obstacle using string indexing and one-hot-encoding.

To illustrate the VectorAssembler, if three numeric columns are being assembled, the first row of data and header of a made-up DataFrame would look like: 

 id  | code | sales  | 
-----|------|--------|
 101 |  21  | 535.14 |

After using the VectorAssembler to transform the data, the result is the original DataFrame plus a new column, “features“, where each row is a vector comprising of all the data in the three columns. The first row of the new DataFrame would look like:

id  | code | sales  | features              |
----|------|--------|-----------------------|
101 |  21  | 535.14 | [101.0, 21.0, 535.14] |

To use the VectorAssembler we must first import the class from the ml package. In PySpark that would be from pyspark.ml.feature import VectorAssembler and in Scala that would be import org.apache.spark.ml.feature.VectorAssembler

To use the VectorAssembler in PySpark, set it equal to a variable, normally called something like “assembler“, and it always uses two parameters, inputCols and outputColinputCols can be one or more columns specified in a list but outputCol is always a single column given as a string. Normally, the output column name is "features" but it can be anything you would like. Below is a complete example:

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
  inputCols = ['year', 'population', 'labor_force', 'population_percent', 'employed_total', 'employed_percent', 'agrictulture_ratio', 'nonagriculture_ratio', 'unemployed', 'unemployed_percent', 'not_in_labor']
  , outputCol = "features"
)

To use the assembler, call the transform() method on the assembler variable and pass a DataFrame. Set this to a new variable name. The result is a new DataFrame with an additional new column that we called "features" where each row is a vector of the columns we specified. This would look like:

output = assembler.transform(employment_df)

The "features" column looks like:

output.select("features").show(10, truncate=False)

Output:

+-----------------------------------------------------------------------------+
|features                                                                     |
+-----------------------------------------------------------------------------+
|[1941.0,99900.0,55910.0,56.0,50350.0,50.4,9100.0,41250.0,5560.0,9.9,43990.0] |
|[1942.0,98640.0,56410.0,57.2,53750.0,54.5,9250.0,44500.0,2660.0,4.7,42230.0] |
|[1943.0,94640.0,55540.0,58.7,54470.0,57.6,9080.0,45390.0,1070.0,1.9,39100.0] |
|[1944.0,93220.0,54630.0,58.6,53960.0,57.9,8950.0,45010.0,670.0,1.2,38590.0]  |
|[1945.0,94090.0,53860.0,57.2,52820.0,56.1,8580.0,44240.0,1040.0,1.9,40230.0] |
|[1946.0,103070.0,57520.0,55.8,55250.0,53.6,8320.0,46930.0,2270.0,3.9,45550.0]|
|[1947.0,106018.0,60168.0,56.8,57812.0,54.5,8256.0,49557.0,2356.0,3.9,45850.0]|
|[1947.0,101827.0,59350.0,58.3,57038.0,56.0,7890.0,49148.0,2311.0,3.9,42477.0]|
|[1948.0,103068.0,60621.0,58.8,58343.0,56.6,7629.0,50714.0,2276.0,3.8,42447.0]|
|[1949.0,103994.0,61286.0,58.9,57651.0,55.4,7658.0,49993.0,3637.0,5.9,42708.0]|
+-----------------------------------------------------------------------------+
only showing top 10 rows

In Scala, using the VectorAssembler is very similar to PySpark, but with a few minor differences. The first difference is that in Scala to use the VectorAssembler you use the new keyword. In PySpark you would directly call VectorAssembler. In Scala this would look like val assembler = new VectorAssembler(). The other difference is the input and output methods are named slightly different. In Scala, use setInputCols and setOutputCol. Other than that everything is the same. The complete Spark Scala code is:

import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler()
  .setInputCols(Array("year", "population", "labor_force", "population_percent", "employed_total", "employed_percent", "agrictulture_ratio", "nonagriculture_ratio", "unemployed", "unemployed_percent", "not_in_labor"))
  .setOutputCol("features")

val assembled = assembler.transform(employment_df)

assembled.show(truncate=false)

Now let’s use the VectorAssembler with our first statistical function: the Summarizer. 

Summarizer

The Summarizer is a Spark statistical operation to return summary statistics on vector columns. The statistical metrics that are available include maximum, minimum, mean, variance, number of non-zeros, and count. The Summarizer method is a good tool to use for getting various statistics on a new vector column when using machine learning pipelines. To use the Summarizer, import the package from pyspark.ml.stat import Summarizer in PySpark and import org.apache.spark.ml.stat.Summarizer in Spark Scala. 

The Summarizer package has a method metrics() which is used to specify multiple summary statistics at once. It accepts one or many of the following parameters: "count""min""max""mean""variance""numNonZeros“. 

Exercise 9: Summary Statistics of Vector columns with Summarizer

  1. Add the import for the VectorAssembler and Summarizer. Then create a VectorAssembler with a variable name “assembler“. In this exercise we will use the “employment_df” DataFrame. Include every column except "footnotes" in the input columns and set the output column to "features". Call the assembler on the employment_df with the transform method.

PySpark:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Summarizer

assembler = VectorAssembler(
  inputCols = ['year', 'population', 'labor_force', 'population_percent', 'employed_total', 'employed_percent', 'agrictulture_ratio', 'nonagriculture_ratio', 'unemployed', 'unemployed_percent', 'not_in_labor']
  , outputCol = "features"
)

assembled = assembler.transform(employment_df)

Spark Scala:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.stat.Summarizer

val assembler = new VectorAssembler()
  .setInputCols(Array("year", "population", "labor_force", "population_percent", "employed_total", "employed_percent", "agrictulture_ratio", "nonagriculture_ratio", "unemployed", "unemployed_percent", "not_in_labor"))
  .setOutputCol("features")

val assembled = assembler.transform(employment_df)
  1. Call Summarizer with the metrics() method with "max" and "mean" as parameters. Then call the summary() method and pass vector columns as parameters. Since we called our assembled column “features” call this column by index and set the operation to a variable called “summarizer“. 

PySpark:

summarizer = Summarizer.metrics("max", "mean").summary(assembled["features"])

Spark Scala:

val summarizer = Summarizer.metrics("max", "mean").summary(assembled("features"))
  1. Lastly, use the assembled DataFrame and call the select() method and pass the summarizer variable from Step 2. Display the DataFrame and display all characters of the cells by turning off truncate.

PySpark:

assembled.select(summarizer).show(truncate=False)

Spark Scala:

assembled.select(summarizer).show(truncate=false)
  1. The final result will be a one-column DataFrame with one row. The cell will include two vectors. The first being the maximum of every vector column and the second vector will be the average of every vector column we specified in the VectorAssembler. The output will be:
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|aggregate_metrics(features, 1.0)                                                                                                                                                                                                                                                               |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[[2010.0,237830.0,154287.0,67.1,146047.0,64.4,9250.0,143952.0,14825.0,9.9,83941.0], [1975.0985915492959,156272.52112676064,98707.49295774644,62.23098591549296,93033.57746478873,58.77323943661971,4456.492957746477,88577.02816901408,5673.774647887321,5.5098591549295755,57564.92957746477]]|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

This exercise showed how to use the Summarizer to return summary statistics on vectors from the Vector Assembler. 

Instead of supplying multiple summary statistics at once, we can select only one. When supplying only one summary statistic the metrics() method is no longer necessary and the operation can be performed all in one line. Notice in the code below we are calling the variance method directly on the Summarizer method. 

The PySpark code looks like:

assembled.select(Summarizer.variance(assembled.features)).show(truncate=False)

The Spark Scala code looks like:

assembled.select(Summarizer.variance(assembled("features"))).show(truncate=false)

Correlation

Correlation is a statistics concept that analyzes the relationship between two variables. The output of a correlation between two variables is any real number between negative one (-1) and positive one (1). If the correlation is a positive number that means that both values increase together. And if the correlation is negative that means when one value increases the other decreases and vice versa. The closer the correlation number is to positive one the higher the positive correlation is and the closer the correlation number is to negative one the higher the negative correlation is. A correlation value at or around zero means there is no correlation.

Spark has a built-in method Correlation.corr() that is used to find the correlation of every numeric column in the DataFrame. Correlation on a DataFrame can help us determine whether columns in a DataFrame have a mathematical relationship or not. The output of the Correlation.corr() method is a correlation matrix. A correlation matrix is a table displaying the correlation values between each column in the DataFrame. The number of columns in the DataFrame are equal in size and order to the columns and rows of the correlation matrix. In Spark the final output of a correlation matrix (after formatting the outcome) would look similar to the following:

+---------------------+---------------------+---------------------+--------------------+
|_1                   |_2                   |_3                   |_4                  |
+---------------------+---------------------+---------------------+--------------------+
|1.0                  |0.004846468719879288 |8.854523499672676E-15|-0.15142773186371963|
|0.004846468719879288 |1.0                  |1.884518907571807E-14|0.003291677824776625|
|8.854523499672676E-15|1.884518907571807E-14|1.0                  |0.10415527325723785 |
|-0.15142773186371963 |0.003291677824776625 |0.10415527325723785  |1.0                 |
+---------------------+---------------------+---------------------+--------------------+

Along the diagonal the correlation values will always be one because the correlation between the same column is a perfect “1“. All other values in the matrix are the correlation values between two different columns. It is important to note that all values above the diagonal 1.0 are identical to the values below the diagonal. So only one triangle of the matrix has to evaluated.

Below is a Correlation Matrix taken from this article on Correlation. It is a great example of a Correlation Matrix with all the different pieces clearly marked. The columns of the original data (“Income 2010”, “Income 2011”, “Income 2012”, “Income 2013”, “Income 2014”) make up the rows and columns of the Correlation Matrix. The diagonal line outlined in a red border is always 1.000 because a column correlated to itself has a value of 1.000. The bottom half of the matrix is shaded grey to easily show that the bottom half is identical to the top half. So it is only necessary to look at one half to understand the matrix.

https://spss-tutorials.com/img/correlation-coefficient-spss-correlation-matrix.png

In the following exercise we will use Correlation to output the correlation matrix of DataFrame columns.

Exercise 10: Correlation of columns in a DataFrame

  1. Specify the imports for the VectorAssembler and Correlation. As before create the VectorAssembler to create a variable assembler. Then use the transform() method with the electricity_df DataFrame to create a DataFrame called assembled

PySpark:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

assembler = VectorAssembler(
  inputCols = ["date", "day", "period", "nswprice", "nswdemand", "vicprice", "vicdemand", "transfer"]
  , outputCol = "features"
)

assembled = assembler.transform(electricity_df)

Spark Scala:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.ml.linalg.Matrix

val assembler = new VectorAssembler()
  .setInputCols(Array("date", "day", "period", "nswprice", "nswdemand", "vicprice", "vicdemand", "transfer"))
  .setOutputCol("features")

val assembled = assembler.transform(electricity_df)
  1. Use the Correlation method and call the corr() method. The first parameter is the DataFrame that includes a column of vectors and the second parameter is the name of the vector column "features". In the PySpark example we are using the default correlation method "pearson". In the Scala example we are using the method "spearman"

PySpark:

pearson_corr = Correlation.corr(assembled, "features")

Spark Scala:

val coeff_df = Correlation.corr(assembled, "features", "spearman")

The results will be different because the PySpark correlation is using the "pearson" calculation and the Spark Scala correlation is using the "spearman" calculation.

  1. The output is a DataFrame but the matrix data returned is not easily printed or displayed. In PySpark we can convert the output into a transformed DataFrame where each cell is a coefficient in the matrix. To do this we convert the DataFrame to a Row with head()[0] and then use the Python NumPy toArray() to make a nested array corresponding to a matrix. Then wrap everything in a list. Lastly, use the list to make a DataFrame. 
    In Spark Scala, convert the DataFrame to a Row then parallelize the data as a RDD. Lastly, print each line of the RDD.

PySpark:

corr_list = pearson_corr.head()[0].toArray().tolist()
pearson_corr_df = spark.createDataFrame(corr_list)
pearson_corr_df.show(truncate=False)

Spark Scala:

val Row(coeff_matrix: Matrix) = coeff_df.head

val matrix_rdd = spark.sparkContext.parallelize(coeff_matrix.rowIter.toSeq)
matrix_rdd.take(coeff_matrix.numRows).foreach(println)

The outputs of the PySpark and Scala code in step 3 are below.

PySpark:

+---------------------+---------------------+---------------------+--------------------+---------------------+---------------------+--------------------+---------------------+
|_1                   |_2                   |_3                   |_4                  |_5                   |_6                   |_7                  |_8                   |
+---------------------+---------------------+---------------------+--------------------+---------------------+---------------------+--------------------+---------------------+
|1.0                  |0.004846468719879288 |8.854523499672676E-15|-0.15142773186371963|0.06674767558474778  |0.00322459051824594  |0.055323483421909056|0.4050120407307455   |
|0.004846468719879288 |1.0                  |1.884518907571807E-14|0.003291677824776625|-0.052671966082517084|1.2265263118046266E-4|0.07771631447740471 |-0.056158731570133416|
|8.854523499672676E-15|1.884518907571807E-14|1.0                  |0.10415527325723785 |0.4455472215923164   |0.023245493528757318 |0.1929449450742059  |-0.11697903274792726 |
|-0.15142773186371963 |0.003291677824776625 |0.10415527325723785  |1.0                 |0.30495209655257954  |0.2860717177157712   |0.30727480053089823 |-0.27500093943730375 |
|0.06674767558474778  |-0.052671966082517084|0.4455472215923164   |0.30495209655257954 |1.0                  |0.08639342686907765  |0.6685009007912806  |-0.2678965336407084  |
|0.00322459051824594  |1.2265263118046266E-4|0.023245493528757318 |0.2860717177157712  |0.08639342686907765  |1.0                  |0.12755315891864022 |-0.08416615931743009 |
|0.055323483421909056 |0.07771631447740471  |0.1929449450742059   |0.30727480053089823 |0.6685009007912806   |0.12755315891864022  |1.0                 |-0.5555251431497068  |
|0.4050120407307455   |-0.056158731570133416|-0.11697903274792726 |-0.27500093943730375|-0.2678965336407084  |-0.08416615931743009 |-0.5555251431497068 |1.0                  |
+---------------------+---------------------+---------------------+--------------------+---------------------+---------------------+--------------------+---------------------+

Spark Scala:

[1.0,0.004318112026503217,7.247415595397252E-14,-0.2969074177180276,0.05408642576528982,-0.19761535994841303,0.022438556398995547,0.38466982877972583]
[0.004318112026503217,1.0,-7.097209265189128E-13,0.018642492948788388,-0.06264194952234474,0.05942740019308374,0.060565570201120085,-0.046088285301379546]
[7.247415595397252E-14,-7.097209265189128E-13,1.0,0.13595503500101705,0.41403481586008606,0.11299524005300357,0.18831920547656122,-0.10671430125625234]
[-0.2969074177180276,0.018642492948788388,0.13595503500101705,1.0,0.3276876341958795,0.8558708398976153,0.3775894822099036,-0.4326387834075226]
[0.05408642576528982,-0.06264194952234474,0.41403481586008606,0.3276876341958795,1.0,0.324716431874232,0.6613596233391824,-0.24471968787602386]
[-0.19761535994841303,0.05942740019308374,0.11299524005300357,0.8558708398976153,0.324716431874232,1.0,0.488006405824276,-0.48398201851129763]
[0.022438556398995547,0.060565570201120085,0.18831920547656122,0.3775894822099036,0.6613596233391824,0.488006405824276,1.0,-0.5055826343561686]
[0.38466982877972583,-0.046088285301379546,-0.10671430125625234,-0.4326387834075226,-0.24471968787602386,-0.48398201851129763,-0.5055826343561686,1.0]

The final code all together is:

PySpark:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

assembler = VectorAssembler(
  inputCols = ["date", "day", "period", "nswprice", "nswdemand", "vicprice", "vicdemand", "transfer"]
  , outputCol = "features"
)

assembled = assembler.transform(electricity_df)

pearson_corr = Correlation.corr(assembled, "features")

corr_list = pearson_corr.head()[0].toArray().tolist()
pearson_corr_df = spark.createDataFrame(corr_list)
pearson_corr_df.show(truncate=False)

Spark Scala:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.ml.linalg.Matrix

val assembler = new VectorAssembler()
  .setInputCols(Array("date", "day", "period", "nswprice", "nswdemand", "vicprice", "vicdemand", "transfer"))
  .setOutputCol("features")

val assembled = assembler.transform(electricity_df)

val coeff_df = Correlation.corr(assembled, "features", "spearman")

val Row(coeff_matrix: Matrix) = coeff_df.head

val matrix_rdd = spark.sparkContext.parallelize(coeff_matrix.rowIter.toSeq)
matrix_rdd.take(coeff_matrix.numRows).foreach(println)

Correlation is a very powerful statistical operation and Spark makes it very easy to perform on DataFrames.


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.