Spark Starter Guide 4.6: How to Aggregate Data

Previous post: Spark Starter Guide 4.5: How to Join DataFrames

Introduction

Also known as grouping, aggregation is the method by which data is summarized by dividing it into common, meaningful groups. At the same time that information is grouped, you can also summarize data points from those groups through a series of SQL-like aggregate functions (i.e. functions that you can run during data aggregations).

Say you run an online e-commerce website that sells shoes. You have data logs that tell you what each customer purchased, and when.

customershoe_namesale_pricepurchase_date
landonblue shoes5.002020-10-01
jamesblue shoes5.002020-10-04
zachwhite shoes6.002020-10-06
An example data set of sold shoes, called Sales.

At the end of each month, you might want to aggregate that data such that you can see how much revenue each model of shoe brought in that month. In SQL, that might look something like this:

select sum(sale_price) as revenue, 
       shoe_name 
from sales 
group by shoe_name

In this example, shoe_name is our grouping field, and the sum total of sales (for each shoe_name) is our aggregation metric. You would expect results to show something like:

itemrevenue
blue shoes10.00
white shoes6.00
After all, we had two sales of blue shoes at $5/pair, and only one sale of white shoes at $6 a pair. If we had just done sum(sale_price) as revenue, but didn’t group by shoe_name, we’d simply get a total of $16.

Sum, of course, is just one example of an aggregate function. Others include min(), max(), count(), and stdev(), to name a few. All of these can be used to summarize identifiable groups within your data.

In the following exercises, we will learn to analyze data in groups by way of aggregation.


Exercise Setup

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.functions.{max, min}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Continuing to build on our animal data set (from previous exercises in this guide), Create a Sequence of Rows, each containing an animal name, type, age and color using the following code:

val my_previous_pets = Seq(
  Row("fido", "dog", 4, "brown"),
  Row("annabelle", "cat", 15, "white"),
  Row("fred", "bear", 29, "brown"),
  Row("gus", "parakeet", 2, "black"),
  Row("daisy", "cat", 8, "black"),
  Row("jerry", "cat", 1, "white"),
  Row("fred", "parrot", 1, "brown"),
  Row("gus", "fish", 1, "gold"),
  Row("gus", "dog", 11, "black"),
  Row("daisy", "iguana", 2, "green"),
  Row("rufus", "dog", 10, "gold"))

Create a schema that mirrors the data you just created using the following code. The schema will be used by Spark to form a DataFrame.

val schema = List(
  StructField("nickname", StringType, nullable = true),
  StructField("type", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true),
  StructField("color", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val petsRDD = spark.sparkContext.parallelize(my_previous_pets)

Create a DataFrame from the RDD and schema created using the following code:

val petsDF = spark.createDataFrame(petsRDD, StructType(schema))

Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:

petsDF.createOrReplaceTempView("pets")

You’ve now completed the initial setup for this exercise in Scala. Skip past the subsequent section in Python to continue.


Follow these steps to complete the exercise in PYTHON:

Import relevant Spark SQL libraries using the following code:

from pyspark.sql import functions as F

Continuing to build on our animal data set, Create a List of Rows, each containing an animal name, type, age and color using the following code:

my_previous_pets = [("fido", "dog", 4, "brown"),
                    ("annabelle", "cat", 15, "white"),
                    ("fred", "bear", 29, "brown"),
                    ("gus", "parakeet", 2, "black"),
                    ("daisy", "cat", 8, "black"),
                    ("jerry", "cat", 1, "white"),
                    ("fred", "parrot", 1, "brown"),
                    ("gus", "fish", 1, "gold"),
                    ("gus", "dog", 11, "black"),
                    ("daisy", "iguana", 2, "green"),
                    ("rufus", "dog", 10, "gold")]

Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:

petsRDD = sc.parallelize(my_previous_pets)

Create a DataFrame from the RDD and a provided schema using the following code:

petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type', 'age', 'color'])

Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:

petsDF.createOrReplaceTempView('pets')

You’ve now completed the initial setup for this exercise in Python.

If you chose to do the setup for this exercise in Scala, you can now proceed from this point.

Analysis through Aggregation

We now have an in-memory view (i.e. relational table representation) of our data. It only exists within the confines of our Spark application, but it enables us to run SQL queries against it as if it were a real table in a database. This will prove handy time after time in your work with Spark.

Now that we have a query-able view, let’s answer several different questions about the data by using Spark’s rich, comprehensive SQL functionality to query the temporary view we’ve created. Once a table is registered, you can query it as many times as you like, again, as if it were a real table.

As usual, we’ve provided solutions in both Scala and Python for your convenience. Where the solution is the exact same, we have coalesced the code.

What are the three most popular (i.e. recurring) names in the data?

To answer this question, we need to write a SQL query in Spark SQL that gives us the count of name occurrences in the table. You can also do this functionally (with methods on the DataFrame), but this example will showcase the pure SQL approach.

The following shows the code in SCALA and PYTHON:

spark.sql("select nickname, count(*) as occurrences from pets group by nickname order by occurrences desc limit 3").show()

The following is the output of the preceding code:

+--------+-----------+
|nickname|occurrences|
+--------+-----------+
|     gus|          3|
|    fred|          2|
|   daisy|          2|
+--------+-----------+

As can be seen the three most popular names are gus, fred, and daisy.

How old is the oldest cat in the data?

We can use the functional API of Spark SQL to find the maximum age of cats in the data. As demonstrated above, you can also use pure SQL to achieve this – but this example will focus on the purely functional approach.

Follow these steps for SCALA:

  1. Use the where() function of the DataFrame to filter the data to just cats.
  2. Use the agg() function of the DataFrame to select the max age.
  3. Use the show() function of the DataFrame to print the results to the console.

In code form:

petsDF.where("type = 'cat'")
      .agg(Map("age" -> "max"))
      .show()

The following is the output of the preceding code:

+--------+
|max(age)|
+--------+
|      15|
+--------+

The oldest cat in our data is 15!

Follow these steps for PYTHON:

  1. Use the where() function of the DataFrame to filter the data to just cats.
  2. Use the agg() function of the DataFrame to select the max age.
  3. Use the show() function of the DataFrame to print the results to the console.

In code form:

petsDF.where("type = 'cat'")\
    .agg({"age": "max"})\
    .show()

The following is the output of the preceding code:

+--------+
|max(age)|
+--------+
|      15|
+--------+

What are the youngest and oldest cat ages?

We can use the functional API of Spark SQL to to answer this question.

Following are the steps for implementing this in SCALA:

  1. Use the where() function of the DataFrame to filter the data to just cats.
  2. Group the data by type using groupBy().
  3. Then, combine the agg() function of the DataFrame with the min() and max() functions to request two metrics: min and max age. Optional: rename the columns using alias().
  4. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.where("type = 'cat'")
  .groupBy("type")
  .agg(min("age").alias("min_age"), max("age").alias("max_age"))
  .show()

The following is the output of the preceding code:

+----+-------+-------+
|type|min_age|max_age|
+----+-------+-------+
| cat|      1|     15|
+----+-------+-------+

As can be seen the youngest cat’s age is 1 and the oldest cat’s age is 15.

Following are the steps for implementing this in PYTHON:

  1. Use the where() function of the DataFrame to filter the data to just cats.
  2. Group the data by type using groupBy().
  3. Then, combine the agg() function of the DataFrame with the min() and max() functions to request two metrics: min and max age. Optional: rename the columns using alias().
  4. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.where(petsDF["type"] == "cat") \
    .groupBy("type") \
    .agg(F.min("age"), F.max("age")) \
    .show()

The following is the output of the preceding code:

+----+-------+-------+
|type|min_age|max_age|
+----+-------+-------+
| cat|      1|     15|
+----+-------+-------+

What is the average dog age?

We can use the functional API of Spark SQL to find the average age of dogs.

Following are the steps for implementing this in SCALA:

  1. Use the where() function of the DataFrame to filter the data to just dogs.
  2. Group the data by type using groupBy().
  3. Use the agg() function of the DataFrame to select the average age.
  4. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.where("type = 'dog'")
    .groupBy("type")
    .agg("age" -> "avg")
    .show()

The following is the output of the preceding code:

+----+-----------------+
|type|         avg(age)|
+----+-----------------+
| dog|8.333333333333334|
+----+-----------------+

As can be seen the average dog’s age is 8.334.

Following are the steps for implementing this in PYTHON:

  1. Use the where() function of the DataFrame to filter the data to just dogs.
  2. Group the data by type using groupBy().
  3. Use the agg() function of the DataFrame to select the average age.
  4. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.where("type = 'dog'")\
    .groupBy("type")\
    .agg(F.avg("age"))\
    .show()

The following is the output of the preceding code:

+----+-----------------+
|type|         avg(age)|
+----+-----------------+
| dog|8.333333333333334|
+----+-----------------+

As can be seen the average age of dogs is 8.334.

How many pets of each color are there in the data?

We can use the functional API of Spark SQL to find how many pets of each color exist in the data.

Following are the steps for implementing this in SCALA and PYTHON:

  1. Group the data by type using groupBy().
  2. Use the groupBy() function of the DataFrame to count the records in each group.
  3. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.groupBy("color").count().show()

The following is the output of the preceding code:

+-----+-----+
|color|count|
+-----+-----+
|green|    1|
|white|    2|
| gold|    2|
|black|    2|
|brown|    3|
+-----+-----+

As can be seen the number of pets in each colors can be seen here.

And that’s a rough introduction to aggregation in Spark / Spark SQL! There are a lot of powerful operations you can conduct in Spark, so keep exploring the APIs!

In the next section, we’ll cover Standardization – a practice that’s common in Data Engineering but not necessarily in Spark. See you then!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.