Spark Starter Guide 4.10: Using Having to Filter on Aggregate Columns

Landon Robinson
Hadoopsters
Published in
5 min readFeb 3, 2022

--

Photo by Tyler Nix on Unsplash

Previous post: Spark Starter Guide 4.9: How to Rank Data

Having is similar to filtering (filter(), where() or where (in a SQL clause)), but the use cases differ slightly. While filtering allows you to apply conditions on your non-aggregated columns to limit the result set, Having allows you to apply conditions on aggregate functions / columns instead.

Both filters limit your result set — but the difference in how they are applied is the key. In short: where filters are for row-level filtering. Having filters are for aggregate-level filtering. As a result, using a Having statement can also simplify (or outright negate) the need to use some sub-queries or CTEs.

Let’s look at an example.

In previous entries in The Spark Starter Guide, we used filtering to remove animals from a dataset if their name didn’t start with the letter c. We also used filtering to remove any animals categorized as pets from our dataset.

Having is useful if you want to aggregate a metric in your dataset, and then filter your dataset further based on that aggregated metric. In other words, it’s like a where clause for aggregate functions.

Exercise Setup

Which 3 categories of animals are, collectively (i.e. in total), the oldest?

Put more simply, this question is asking, “if you were to sum up the ages of each animal in a type/category, which three categories have the greatest combined age”? Thankfully, Spark makes it simple to achieve this.

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.functions.{max, min}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

These allow us to use Spark SQL functions like max() and min() programmatically.

Import spark.implicits, which will be useful for handy operations in a later step using the following code:

import spark.implicits._

Create a Sequence of Rows, each containing a name, type, age and color using the following code:

val my_previous_pets = Seq(Row("fido", "dog", 4, "brown"),
Row("annabelle", "cat", 15, "white"),
Row("fred", "bear", 29, "brown"),
Row("daisy", "cat", 8, "black"),
Row("jerry", "cat", 1, "white"),
Row("fred", "parrot", 1, "brown"),
Row("gus", "fish", 1, "gold"),
Row("gus", "dog", 11, "black"),
Row("daisy", "iguana", 2, "green"),
Row("rufus", "dog", 10, "gold"))

Create a schema that corresponds to the data using the following code:

val schema = List(
StructField("nickname", StringType, nullable = true),
StructField("type", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("color", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val petsRDD = spark.sparkContext.parallelize(my_previous_pets)

Create a DataFrame from the RDD and schema created using the following code:

val petsDF = spark.createDataFrame(petsRDD, StructType(schema))

Create a temporary table view of the data in Spark SQL called pets using the following code:

petsDF.createOrReplaceTempView("pets")

From here, you have two options for completing this task. You can use SQL code, or write the same logic programmatically.

Option 1: Write a SQL query in Spark SQL that gives us the collective age of each group that is greater than 10 using the following code:

spark.sql("select type, sum(age) as total_age from pets group by type having total_age > 10 order by total_age desc").show()

Option 2: Write the equivalent of a SQL query through programmatic Spark functions to calculate the collective age of each group of animals — and only returning groups with a collective age greater than 10.

petsDF.groupBy("type")
.agg("age" -> "sum")
.withColumnRenamed("sum(age)", "total_age")
.where("total_age > 10")
.orderBy($"total_age".desc)
.show()

The following is the output of the preceding code:

+----+---------+
|type|total_age|
+----+---------+
|bear| 29|
| dog| 25|
| cat| 24|
+----+---------+

As you can see, the only animals that remain in our dataset are, collectively, older than 1: bears, dogs and cats.

Follow these steps to complete the exercise in PYTHON:

Create a List of Rows, each containing a name, type, age and color using the following code:

my_previous_pets = [("fido", "dog", 4, "brown"),
("annabelle", "cat", 15, "white"),
("fred", "bear", 29, "brown"),
("daisy", "cat", 8, "black"),
("jerry", "cat", 1, "white"),
("fred", "parrot", 1, "brown"),
("gus", "fish", 1, "gold"),
("gus", "dog", 11, "black"),
("daisy", "iguana", 2, "green"),
("rufus", "dog", 10, "gold")]

Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:

petsRDD = sc.parallelize(my_previous_pets)

Create a DataFrame from the RDD and a provided schema using the following code:

petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type', 'age', 'color'])

Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:

petsDF.registerTempTable('pets')

You have two options for accomplishing this task: using pure SQL, or taking the programmatic approach.

Option 1: Write a SQL query in Spark SQL that gives us the collective age of each group that is greater than 10, using the following code.

spark.sql("select type, "
"sum(age) as total_age "
"from pets "
"group by type "
"having total_age > 10 "
"order by total_age desc").show()

Option 2: Use the programmatic, function-chaining alternative to Spark SQL to calculate the collective age of each type of animal. Only retain the groups with a collective age over 10.

petsDF.groupBy("type")\
.sum("age")\
.withColumnRenamed("sum(age)", "total_age")\
.where("total_age > 10")\
.orderBy(col("total_age").desc())\
.show()

The following is the output of the preceding code:

+----+---------+
|type|total_age|
+----+---------+
|bear| 29|
| dog| 25|
| cat| 24|
+----+---------+

As expected, only the above three groups pass muster for our condition! They have what it takes!

Coming Up

In the next section, we’ll learn about normalized and de-normalized data, and one way Spark can address navigating them.

Originally published at http://hadoopsters.wordpress.com on February 3, 2022.

--

--

Writing about big data since 2015. Data Science Engineering Manager at Disney.