Introduction

Aggregations are combining data into one or many groups and performing statistical operations like average, maximum, and minimum on the groups. In Spark it is easy to group by a categorical column and perform the needed operation. DataFrames allow multiple groupings and multiple aggregations at once.

Note

This section uses a PySpark and Spark Scala DataFrame called df created from the Parquet files found in at the GitHub repository https://github.com/Teradata/kylo/tree/master/samples/sample-data. In Section 1.3 we demonstrated copying this data to HDFS.

df = spark.read.parquet("hdfs://user/your_user_name/sample-data/parquet/*.parquet")

Built-in Statistical Aggregations

Before we introduce programmatically constructing groupings and aggregations on DataFrames, Spark has several built-in functions that compute statistics on columns of a DataFrame. These functions, describe() and summary(), perform basic calculations on numeric and string columns. The purpose of these functions is for exploratory data analysis. This chapter will introduce and describe these functions but in Chapter 2Advanced DataFrames we will explore making decisions based upon information returned from exploratory data analysis. 

The describe() function is performed on a DataFrame and outputs a new DataFrame with rows containing count, meanstddevmin, and max for each numerical or string column. The resulting DataFrame always has five rows and the number of columns returned equals the number string or numerical columns in the original DataFrame. If no columns are given to the describe() function then every numeric or string column is returned. Columns with a type of date or datetime are not returned. For numeric columns, all of the statistics are computed. But for string columns, only countmin, and max are returned. The min and max return the alphabetical first and last values. Statistics mean and stddev output null because those calculations cannot be evaluated on string values.The describe() function can also be given column names to specifically select the columns to return.

The summary() function is similar to describe() but it provides more detail. A major difference is that summary() accepts statistical operation names instead of column names. Available functions include countmeanstddevminmax, and approximate percentiles specified as a percentages. The user can specify any percentiles from 1% to 99% as parameters. If no statistics are provided then 25%50%75% percentiles are returned. 

To limit the columns returned use the select() method on the DataFrame before calling summary().

Exercise 19: Statistical Aggregation Functions

  1. In PySpark or Spark Scala call the describe() method on our DataFrame and display the results with the following code:
df.describe().show()
+-------+-----------------+----------+---------+-------------------+------+------------+--------------------+--------+---------+-----------------+----------------+--------------------+
|summary|               id|first_name|last_name|              email|gender|  ip_address|                  cc| country|birthdate|           salary|           title|            comments|
+-------+-----------------+----------+---------+-------------------+------+------------+--------------------+--------+---------+-----------------+----------------+--------------------+
|  count|             4999|      5000|     5000|               5000|  5000|        5000|                5000|    5000|     5000|             4689|            5000|                4966|
|   mean|500.5987197439488|      null|     null|               null|  null|        null|3.285845826615411...|    null|     null|150772.2228897419|        61871.37| 6.81818181818182E94|
| stddev|288.6483309321937|      null|     null|               null|  null|        null|1.285453801666783...|    null|     null|78171.51306234104|             NaN|2.530178709610778E95|
|    min|                1|          |    Adams|                   |      |0.103.25.187|                    |"Bonaire|         |         12068.96|                |                    |
|    max|             1000|    Willie|    Young|wyoungdo@dion.ne.jp|  Male| 99.96.79.65|  677198569353795843|Zimbabwe| 9/9/1994|        286735.82|Web Developer IV|      𠜎𠜱𠝹𠱓𠱸𠲖𠳏   |
+-------+-----------------+----------+---------+-------------------+------+------------+--------------------+--------+---------+-----------------+----------------+--------------------+
  1. To limit the columns of the aggregate statistics, pass string column names to the describe() method. The PySpark or Spark Scala code for two columns would be:
df.describe("last_name", "salary").show()
+-------+---------+-----------------+
|summary|last_name|           salary|
+-------+---------+-----------------+
|  count|     5000|             4689|
|   mean|     null|150772.2228897419|
| stddev|     null|78171.51306234104|
|    min|    Adams|         12068.96|
|    max|    Young|        286735.82|
+-------+---------+-----------------+
  1. In PySpark or Spark Scala, use the select() method to limit the summary statistics to just the "salary" column and then call the summary() and show() methods as shown in the following code:
df.select("salary").summary().show()
+-------+-----------------+
|summary|           salary|
+-------+-----------------+
|  count|             4689|
|   mean|150772.2228897419|
| stddev|78171.51306234104|
|    min|         12068.96|
|    25%|         83480.38|
|    50%|        152877.19|
|    75%|        215405.22|
|    max|        286735.82|
+-------+-----------------+
  1. To specify the aggregate statistics and exact percentiles pass the arguments to the summary() method. The PySpark or Spark Scala code looks like:
df.select("id", "salary").summary("mean", "stddev", "15%", "66%").show()
+-------+-----------------+-----------------+
|summary|               id|           salary|
+-------+-----------------+-----------------+
|   mean|500.5987197439488|150772.2228897419|
| stddev|288.6483309321937|78171.51306234104|
|    15%|              151|         55371.91|
|    66%|              661|        192806.71|
+-------+-----------------+-----------------+

The describe() and summary() methods are quick and easy built-in functions that provide a snapshot of the data in your DataFrame.

Crosstab

The crosstab() function returns a DataFrame of the pair-wise frequency between two columns. Pair-wise frequency is the count of each unique pair of two items. In our case, it’s the count of of two columns in a DataFrame. The crosstab() function only takes two parameters which must be column names of the DataFrame. The columns don’t have to be string columns but if the columns are not categorical in nature then the output DataFrame will be large and useless. It will be helpful to look at an example.

In PySpark, we can directly call the crosstab() method on our DataFrame. In the following example, the first parameter "country" makes up the rows of the resulting DataFrame. The unique values from second parameter "gender" make up the columns of the resulting DataFrame obtained from the following code:

df.crosstab("country", "gender").show(10)

To use a crosstab() function in Scala, first call the stat function. The stat functions are a special subset of statistical functions used on DataFrames. In Python they are baked into the normal DataFrame operations. But in Scala they are a separate class and to be used you must first call the stat function on the DataFrame like df.stat. We will learn more about some of the statistical functions in Chapter 2, Advanced DataFrames.

In this Spark Scala example, we call the stat function on the DataFrame and then call the crosstab method with the "country" and "gender" columns using the following code:

df.stat.crosstab("country", "gender").show(10)

Running either the preceding PySpark or Spark Scala code displays the following:

+--------------+---+------+----+
|country_gender|   |Female|Male|
+--------------+---+------+----+
|         Haiti|  0|     1|   2|
|       Vanuatu|  1|     1|   1|
|          Iraq|  0|     4|   2|
|   South Sudan|  0|     1|   1|
|        Poland| 12|    84|  82|
|    East Timor|  0|     1|   1|
|    Martinique|  0|     0|   1|
|        Jordan|  1|     2|   1|
|         Gabon|  0|     0|   1|
|   Philippines| 13|   139| 101|
+--------------+---+------+----+
only showing top 10 rows

In the resulting DataFrame the first column lists each distinct value in the column "country". It is first because we gave it as the first parameter. The remaining columns are the distinct values of the second parameter "gender” and display the counts of rows that contain both combinations of values. The second column are the counts of all the values in “gender” that are blank or null.  If we swapped the two parameters to be df.crosstab("gender", "country").show(10) the resulting DataFrame would have three rows (Male, Female, blank) and many columns for all the distinct country names.

groupBy and Aggregations

The groupBy() method groups column data together so aggregation functions can be applied to each group. groupBy() can be given no parameters or also accepts one or more column names.  

Note

It is important to point out that in Spark the methods groupBy() and groupby() are equivalent.

The groupBy() returns something Spark calls GroupedData.  GroupedData needs functions to be applied to each group in order to be useful. Without functions applied to groups of data, the data has only been sorted. The main functions that are used in conjunction with  groupBy() include the following: agg()avg()count()max()mean()min()sum(). The functions avg() and mean() are equivalent. 

It is important to note, and will come up later, that the preceding functions (avg()count()max()mean()min()sum()) are for GroupedData and are different than the functions of the same name that are applied to columns of a DataFrame.

There are many options in Spark for grouping and aggregating data. Let’s start with basic examples and work up to more complex variants. 

If no parameters are passed to groupBy() then Spark groups all the values for each integer-like column in the DataFrame into one group. Thus only one row is returned: the average, sum, min, or max of for each entire column.

Exercise 20: groupBys with No Parameters

  1. In PySpark or Spark Scala return the average of each integer-like column by calling the groupBy() method, then the avg() function, and displaying the resulting DataFrame using the following code:
df.groupBy().avg().show()
+-----------------+-----------------+
|          avg(id)|      avg(salary)|
+-----------------+-----------------+
|500.5987197439488|150772.2228897419|
+-----------------+-----------------+
  1. In PySpark or Spark Scala return the sum of each integer-like column using the following code:
df.groupBy().sum().show()
+-------+-------------------+
|sum(id)|        sum(salary)|
+-------+-------------------+
|2502493|7.069709531299998E8|
+-------+-------------------+
  1. To return the aggregate of a particular column, in the aggregate function supply a quoted column name as a parameter. To get the maximum value for the "salary" column pass the quoted string "salary" to the max() function using the following code:
df.groupBy().max("salary").show()
+-----------+
|max(salary)|
+-----------+
|  286735.82|
+-----------+

The groupBy() forms the basis for every exercise for the remainder of the section. It is a powerful method that offers detailed control to achieve any kind of aggregate calculation.

To group column data into more groups than the entire column, pass a column name parameter to the groupBy() method. It accepts quoted string column names, column names by attribute, column names by index, and Scala $-notation.

In the next exercise we will use the count() function. count() is a unique method of GroupedData that doesn’t take any parameters, because counting values is the same for each column.

Exercise 21: groupBys with a Single Parameter

  1. In PySpark or Spark Scala to return the number of rows for each distinct group in the "gender" column using the count() function. The count() function takes no parameters and returns the number of rows of a DataFrame. We can use this in conjunction with the groupBy() method to count the values of Male and Female. Note: the "gender" column has blank values which are included as a distinct value. So, they will form their own group in the groupBy
df.groupBy("gender").count().show()
+------+-----+
|gender|count|
+------+-----+
|Female| 2391|
|  Male| 2300|
|      |  309|
+------+-----+
  1. In PySpark to find the average of each integer-like column for each group in the "gender" column call the avg() function on the groupBy() method with the parameter set to the "gender" column. The following two examples in PySpark show the average and the minimum values for each group in the "gender" column by selecting column by index and by attribute:
df.groupby(df["gender"]).avg().show()
+------+------------------+------------------+
|gender|           avg(id)|       avg(salary)|
+------+------------------+------------------+
|Female|496.99205353408615|147013.72066499369|
|  Male| 500.2448890822097|154682.83160139245|
|      | 531.1391585760517|              null|
+------+------------------+------------------+
df.groupby(df.gender).min().show()
+------+-------+-----------+
|gender|min(id)|min(salary)|
+------+-------+-----------+
|Female|      1|   12121.98|
|  Male|      1|   12068.96|
|      |      1|       null|
+------+-------+-----------+
  1. In Spark Scala to find the maximum value for each group use the max() function. In the following code use the Scala $-notation to select columns:
import spark.implicits._
df.groupBy($"gender").max().show()
+------+-------+-----------+
|gender|max(id)|max(salary)|
+------+-------+-----------+
|Female|   1000|  286735.82|
|  Male|    999|  286388.01|
|      |    998|       null|
+------+-------+-----------+

In this exercise we saw how to use the groupBy() on a column to calculate statistics on the groups of data within that column.


Instead of returning the aggregation function for all the numerical columns we can limit the output to specific columns. We can groupBy() on a single column and then pick which column to return a specific aggregate function by passing a column to the aggregate function. As discussed before the aggregate functions only take column names that are quoted strings.

Exercise 22: Single groupBy on Specific Aggregate Function Column(s)

  1. In PySpark or Spark Scala return the average of the "salary" column for each group in the “gender" column by passing the column name "salary" inside the avg() function using the following code:
df.groupBy("gender").avg("salary").show()
+------+------------------+
|gender|       avg(salary)|
+------+------------------+
|Female|147013.72066499369|
|  Male|154682.83160139245|
|      |              null|
+------+------------------+
  1. We can also show aggregate values for more than one column. In the aggregate function pass multiple comma separated quoted column names to return multiple columns. The following example will work in PySpark or Scala because all of the columns are quoted strings:
df.groupBy("country").avg("salary", "id").show(7)
+-------------------+------------------+------------------+
|            country|       avg(salary)|           avg(id)|
+-------------------+------------------+------------------+
|               Chad|         134355.78|             931.0|
|             Russia| 153121.1850171821|             461.4|
|           Paraguay|175281.92375000002|            547.75|
|              Yemen|        166842.214| 500.3529411764706|
|U.S. Virgin Islands|          110669.2|             844.0|
|            Senegal|        234289.865|             455.5|
|             Sweden| 143383.7781451613|507.25757575757575|
+-------------------+------------------+------------------+
only showing top 7 rows

The Aggregate Method

We haven’t mentioned the agg() method yet. The agg() method, short for “aggregate”, is designed to accommodate multiple aggregate functions on the same DataFrame. There are many different ways to use the agg() method. Like previously, we will begin with basic examples and work up to more complicated scenarios.

Aggregate using agg()

The agg() method accepts column names from DataFrames with aggregate functions wrapped around each column. The functions avg()min()max(), and sum() are supplied on columns of a DataFrame. The purpose of the agg() function is to provide detailed options when generating aggregate data.

In the following two PySpark examples we use the agg() function on a single column "salary". In the first example we call the avg() function by selecting the column by index using the following code:

from pyspark.sql.functions import avg
df.groupby("country").agg(avg(df["salary"])).show(5)
+-------------------+------------------+
|            country|       avg(salary)|
+-------------------+------------------+
|               Chad|         134355.78|
|             Russia| 153121.1850171821|
|           Paraguay|175281.92375000002|
|              Yemen|        166842.214|
|U.S. Virgin Islands|          110669.2|
+-------------------+------------------+
only showing top 5 rows

In the second example we use the sum() function by selecting the column by attribute using the following code:

from pyspark.sql.functions import sum
df.groupby("country").agg(sum(df.salary)).show(5)
+-------------------+--------------------+
|            country|         sum(salary)|
+-------------------+--------------------+
|               Chad|           134355.78|
|             Russia|4.4558264839999996E7|
|           Paraguay|  1402255.3900000001|
|              Yemen|          2502633.21|
|U.S. Virgin Islands|            110669.2|
+-------------------+--------------------+
only showing top 5 rows

With the following Spark Scala code we use the agg() method to find the max salary for each country using the $-notation:

import spark.implicits._
import org.apache.spark.sql.functions.max
df.groupBy("country").agg(max($"salary")).show(5)
+-------------------+-----------+
|            country|max(salary)|
+-------------------+-----------+
|               Chad|  134355.78|
|             Russia|  286061.25|
|           Paraguay|  256483.86|
|              Yemen|  278676.73|
|U.S. Virgin Islands|   110669.2|
+-------------------+-----------+
only showing top 5 rows

With the following Spark Scala code we use the agg() method to find the minimum salary for each country by selecting the columns by index:

import org.apache.spark.sql.functions.min
df.groupBy("country").agg(min(df("salary"))).show(5)
+-------------------+-----------+
|            country|min(salary)|
+-------------------+-----------+
|               Chad|  134355.78|
|             Russia|   12092.34|
|           Paraguay|  101195.41|
|              Yemen|   43754.21|
|U.S. Virgin Islands|   110669.2|
+-------------------+-----------+
only showing top 5 rows

It is very important to realize that the avg() functions in the following two examples are not the same:

df.groupby("country").avg("salary").show(5)

df.groupby("country").agg(avg(df("salary"))).show(5)

Both commands produce the exact same DataFrame with the same results. But the avg() function in the first command is the avg() function called on GroupedData. While the second command is the avg() function called upon columns of a DataFrame. 

This is important because Spark can get confused by not knowing which function you are using, either GroupedDataor DataFrame column, and throw an error. If you ever run across this and get an error in Scala like error: not found: value avg pointing to the aggregate function or in Python like TypeError: Column is not iterable Spark is likely confused by which aggregate function you mean. The error arises from namespace collisions in Scala and Python between the two different ways to use aggregate functions. The solution is to be more specific in our commands, so Spark always knows which aggregate function you mean to use.

The first and easiest way to be specific when using the aggregate functions is to directly import the Spark functions before using them. For Scala use import org.apache.spark.sql.functions.avg and for Python use from pyspark.sql.functions import avg

The Spark Scala example below uses the max() function inside the agg() method. To use the max() function we first need to import the function.

import spark.implicits._
import org.apache.spark.sql.functions.max
df.groupBy("country").agg(max($"salary")).show(5)

The following PySpark example shows importing the sum() function and then using the function to get the sum salary for each group.

from pyspark.sql.functions import sum
df.groupby("country").agg(sum(df.salary)).show(5)

Another option for PySpark is to import functions as a specific character string. That would look like from pyspark.sql import functions as F where the character “F” is just an arbitrary string. It could be any string, like "purple_dragons" for example. Importing all the functions using a string is a great way to make the code more readable because there is never any confusion as to which aggregate function the author intended to use.

The following PySpark example shows importing all of the Python functions using a string, in this case the string “F”. The code to find the max salary for each country in the "country" column is as follows:

from pyspark.sql import functions as F
df.groupBy("country").agg(F.max(df.salary), F.avg(df['salary'])).show(5)

The following Pypark code demonstrates that any text string can be used as a variable when importing all of the Python function classes. In this example the string purple_dragons is used as a variable to call all of the Python functions using the following code:

from pyspark.sql import functions as purple_dragons
df.groupby("country").agg(purple_dragons.min(df.salary), purple_dragons.max(df.salary)).show(5)

Did you notice in the preceding examples we gave multiple aggregate functions? To add more aggregate functions separate them by commas in the agg() method.

Exercise 23: Multiple Aggregates

  1. In this Scala example import all the aggregate functions as well as the $-notation import using the following code:
import spark.implicits._
import org.apache.spark.sql.functions.{avg, min, max, sum}
  1. Then use the groupBy() method on the "gender" column. Call the agg() method and pass the different aggregate functions for average, minimum, maximum, and sum on the "salary" column using the following code: 
df.groupBy("gender").agg(avg(df("salary")), min(df("salary")), max($"salary"), sum($"salary")).show()
+------+------------------+-----------+-----------+--------------------+
|gender|       avg(salary)|min(salary)|max(salary)|         sum(salary)|
+------+------------------+-----------+-----------+--------------------+
|Female|147013.72066499369|   12121.98|  286735.82| 3.515098061099999E8|
|  Male|154682.83160139245|   12068.96|  286388.01|3.5546114701999986E8|
|      |              null|       null|       null|                null|
+------+------------------+-----------+-----------+--------------------+

Adding aggregate functions for as many number columns as you like is easy and straightforward with the agg() function.

Other agg() Options

In Spark there are other ways to do aggregate functions using the agg() method. Python also accepts dictionaries and Scala supports the use of maps. Both the Dictionary and the Map have different syntaxes, but both put the column name first and the aggregate function second. 

However, both techniques have some less desirable traits that make the user experience lacking.

To use a dictionary in PySpark, inside the agg() method insert a Python dictionary where the key is the column name and the value is the aggregate function. Use the following code:

df.groupBy("gender").agg({'salary': 'mean', 'salary': 'sum'}).show()
+------+--------------------+
|gender|         sum(salary)|
+------+--------------------+
|Female| 3.515098061099999E8|
|  Male|3.5546114701999986E8|
|      |                null|
+------+--------------------+

Notice in the PySpark code we gave the agg() method two aggregate functions on the "salary" column. But the result only displays the last dictionary key value pair. That is because multiple aggregate functions on the same column is not support with the dictionary technique in the agg() method.

df.groupBy("gender").agg({"salary": "sum", "id": "max"}).show()

The result returns both columns but the order is out of order from the code. That is because when using the dictionary in the agg() method, the output columns are returned in alphabetical order as shown in the following result:

+------+-------+--------------------+
|gender|max(id)|         sum(salary)|
+------+-------+--------------------+
|Female|   1000| 3.515098061099999E8|
|  Male|    999|3.5546114701999986E8|
|      |    998|                null|
+------+-------+--------------------+

To use a Map in Spark Scala the Map() method is placed inside the agg() method. The column name is placed first followed by the string -> and the second value is the aggregate function. Multiple columns can be used by separating the maps with a comma as shown in the following code:

df.groupBy("gender").agg(Map("salary" -> "max", "salary" -> "avg")).show()
+------+------------------+
|gender|       avg(salary)|
+------+------------------+
|Female|147013.72066499369|
|  Male|154682.83160139248|
|      |              null|
+------+------------------+

Similarly, the following code is used for mapping the id column to average function:

df.groupBy("gender").agg(Map("salary" -> "max", "id" -> "avg")).show()
+------+-----------+------------------+
|gender|max(salary)|           avg(id)|
+------+-----------+------------------+
|Female|  286735.82|496.99205353408615|
|  Male|  286388.01| 500.2448890822097|
|      |       null| 531.1391585760517|
+------+-----------+------------------+

Like Python’s Dictionary, Scala’s Map doesn’t allow multiple aggregates on the same column. But it doesn’t automatically sort the column alphabetically.


Multiple columns can be grouped together by passing comma separated columns names to the groupBy() method. The resulting DataFrame returns the unique values between the multiple columns in the groupBy. For example if the first parameter is "gender” and the second is "country" the resulting DataFrame would return rows representing all the unique combinations between the two columns. All of the previous techniques can be applied to the multiple groupBy columns.

Exercise 24: Multiple groupBy Columns

  1. To group multiple columns separate each column with a comma. In this basic PySpark and Spark Scala example use quoted strings to group the data by "gender" and "country" and find the average of each integer-like column:
df.groupby("gender", "country").avg().show(5)
+------+-------------+-----------------+------------------+
|gender|      country|          avg(id)|       avg(salary)|
+------+-------------+-----------------+------------------+
|  Male|     Slovenia|415.1818181818182|171955.97454545452|
|Female|New Caledonia|            699.5|        266981.955|
|Female|       Turkey|            442.0|         166409.44|
|      |        Malta|            238.0|              null|
|Female|       Bhutan|            319.0|        127988.435|
+------+-------------+-----------------+------------------+
only showing top 5 rows
  1. In this PySpark example, find the average and maximum values of the salary column for each group of "gender" and "title":
from pyspark.sql import functions as F
df.groupby("title", df["gender"]).agg(F.avg("salary"), F.max("salary")).orderBy(df.title.desc()).show(7)
+-----------------+------+------------------+-----------+
|            title|gender|       avg(salary)|max(salary)|
+-----------------+------+------------------+-----------+
| Web Developer IV|Female|         125946.64|  125946.64|
| Web Developer IV|  Male| 98066.17666666668|  186469.43|
|Web Developer III|      |              null|       null|
|Web Developer III|Female|         147422.81|  147422.81|
|Web Developer III|  Male| 95462.45999999999|   212486.1|
| Web Developer II|Female|219506.32333333333|  278897.55|
| Web Developer II|      |              null|       null|
+-----------------+------+------------------+-----------+
only showing top 7 rows
  1. In this Spark Scala example find the maximum and minimum salaries for each group of "country" and "gender":
import spark.implicits._
import org.apache.spark.sql.functions.{max, min}
df.groupBy("country", "gender").agg(max(df("salary")), min($"salary"))
  .orderBy($"country".desc)
  .show(7)
+-----------------+------+-----------+-----------+
|          country|gender|max(salary)|min(salary)|
+-----------------+------+-----------+-----------+
|         Zimbabwe|  Male|  216808.38|    69209.3|
|         Zimbabwe|Female|  220421.92|   31016.29|
|           Zambia|  Male|  217828.04|   44165.46|
|            Yemen|      |       null|       null|
|            Yemen|  Male|  278676.73|   91481.77|
|            Yemen|Female|  273445.92|   43754.21|
|Wallis and Futuna|Female|   85623.28|   85623.28|
+-----------------+------+-----------+-----------+
only showing top 7 rows

Renaming Aggregate Columns

Columns can be renamed inside the agg() method using the alias() method. Or renamed after the agg() method by chaining the withColumnRenamed() method. In all of the previous examples the resulting DataFrame created columns like “avg(salary)” or “max(id)”. In the following code we will see how to rename these columns while using the groupBy() method.

Both the alias() and the withColumnRenamed() methods have been covered previously in this chapter. The following four examples, two in PySpark and two in Scala, produce the exact same DataFrame. The difference is that one uses the alias() method and the other uses the withColumnRenamed() method.

This PySpark example uses the alias() method inside the agg() method after each aggregation function:

from pyspark.sql import functions as F
df.groupBy("country").agg(F.avg(df.salary).alias("avg_income"), F.max(df['salary']).alias("max_money")).show(7)

This next PySpark example uses the withColumnRenamed() method after the groupBy() method to rename the columns. Inside the withColumnRenamed() method the column name created by the groupBy() method still must be used as the first parameter:

from pyspark.sql import functions as F
df.groupBy("country").agg(F.avg(df.salary), F.max(df['salary'])) \
  .withColumnRenamed("max(salary)", "max_money") \
  .withColumnRenamed("avg(salary)", "avg_income") \
  .show(7)

This first Spark Scala example uses the alias() method inside the agg() method after each aggregation function:

import org.apache.spark.sql.functions.{avg, max}
df.groupBy("country").agg(avg(df("salary")).alias("avg_income"), max(df("salary")).alias("max_money")).show(7)

The last Spark Scala example uses the withColumnRenamed() method after the groupBy() method to rename the columns:

import org.apache.spark.sql.functions.{avg, max}

df.groupBy("country").agg(avg(df("salary")), max(df("salary")))
  .withColumnRenamed("avg(salary)", "avg_income")
  .withColumnRenamed("max(salary)", "max_money")
  .show(7)

Running the four preceding code samples display the following:

+-------------------+------------------+---------+
|            country|        avg_income|max_money|
+-------------------+------------------+---------+
|               Chad|         134355.78|134355.78|
|             Russia| 153121.1850171821|286061.25|
|           Paraguay|175281.92375000002|256483.86|
|              Yemen|        166842.214|278676.73|
|U.S. Virgin Islands|          110669.2| 110669.2|
|            Senegal|        234289.865|244370.42|
|             Sweden| 143383.7781451613|277717.42|
+-------------------+------------------+---------+
only showing top 7 rows

Summary

This chapter laid the foundation for getting things done in Spark. The DataFrame is the vehicle that will take you anywhere you want to go in Spark. If fact, to work with Spark is to work with DataFrames. 

We stared off by introducing all the myriad ways to construct DataFrames. From creating DataFrames from different file formats to creating them manually from a variety of data structures. And then covered the structure of DataFrames themselves: the schema.

We then moved on to learning how to do things with the DataFrames we created. How to slice and dice DataFrames to reveal the insights we were looking for. We learned how to interact with DataFrame columns and how to transform them. And finally we learned how to group data together and perform calculations on those groups.

In the next chapter, we are going to turn up the heat. We will build on our solid foundation and introduce advanced DataFrame concepts that will allow you to see the potential of Spark DataFrames. We will move on from just doing things with DataFrames to actually making decisions on the data inside DataFrames. And we will lay the ground work for using our DataFrames to make predictions on our data with machine learning.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.