Spark Starter Guide 4.10: Using Having to Filter on Aggregate Columns
Previous post: Spark Starter Guide 4.9: How to Rank Data
Having is similar to filtering (filter()
, where()
or where (in a SQL clause)), but the use cases differ slightly. While filtering allows you to apply conditions on your non-aggregated columns to limit the result set, Having allows you to apply conditions on aggregate functions / columns instead.
Both filters limit your result set — but the difference in how they are applied is the key. In short: where filters are for row-level filtering. Having filters are for aggregate-level filtering. As a result, using a Having
statement can also simplify (or outright negate) the need to use some sub-queries or CTEs.
Let’s look at an example.
In previous entries in The Spark Starter Guide, we used filtering to remove animals from a dataset if their name didn’t start with the letter c
. We also used filtering to remove any animals categorized as pets
from our dataset.
Having is useful if you want to aggregate a metric in your dataset, and then filter your dataset further based on that aggregated metric. In other words, it’s like a where clause for aggregate functions.
Exercise Setup
Which 3 categories of animals are, collectively (i.e. in total), the oldest?
Put more simply, this question is asking, “if you were to sum up the ages of each animal in a type/category, which three categories have the greatest combined age”? Thankfully, Spark makes it simple to achieve this.
Follow these steps to complete the exercise in SCALA:
Import additional relevant Spark libraries using the following code:
import org.apache.spark.sql.functions.{max, min}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
These allow us to use Spark SQL functions like max()
and min()
programmatically.
Import spark.implicits
, which will be useful for handy operations in a later step using the following code:
import spark.implicits._
Create a Sequence of Rows, each containing a name, type, age and color using the following code:
val my_previous_pets = Seq(Row("fido", "dog", 4, "brown"),
Row("annabelle", "cat", 15, "white"),
Row("fred", "bear", 29, "brown"),
Row("daisy", "cat", 8, "black"),
Row("jerry", "cat", 1, "white"),
Row("fred", "parrot", 1, "brown"),
Row("gus", "fish", 1, "gold"),
Row("gus", "dog", 11, "black"),
Row("daisy", "iguana", 2, "green"),
Row("rufus", "dog", 10, "gold"))
Create a schema that corresponds to the data using the following code:
val schema = List(
StructField("nickname", StringType, nullable = true),
StructField("type", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("color", StringType, nullable = true)
)
Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:
val petsRDD = spark.sparkContext.parallelize(my_previous_pets)
Create a DataFrame from the RDD and schema created using the following code:
val petsDF = spark.createDataFrame(petsRDD, StructType(schema))
Create a temporary table view of the data in Spark SQL called pets using the following code:
petsDF.createOrReplaceTempView("pets")
From here, you have two options for completing this task. You can use SQL code, or write the same logic programmatically.
Option 1: Write a SQL query in Spark SQL that gives us the collective age of each group that is greater than 10 using the following code:
spark.sql("select type, sum(age) as total_age from pets group by type having total_age > 10 order by total_age desc").show()
Option 2: Write the equivalent of a SQL query through programmatic Spark functions to calculate the collective age of each group of animals — and only returning groups with a collective age greater than 10.
petsDF.groupBy("type")
.agg("age" -> "sum")
.withColumnRenamed("sum(age)", "total_age")
.where("total_age > 10")
.orderBy($"total_age".desc)
.show()
The following is the output of the preceding code:
+----+---------+
|type|total_age|
+----+---------+
|bear| 29|
| dog| 25|
| cat| 24|
+----+---------+
As you can see, the only animals that remain in our dataset are, collectively, older than 1: bears, dogs and cats.
Follow these steps to complete the exercise in PYTHON:
Create a List of Rows, each containing a name, type, age and color using the following code:
my_previous_pets = [("fido", "dog", 4, "brown"),
("annabelle", "cat", 15, "white"),
("fred", "bear", 29, "brown"),
("daisy", "cat", 8, "black"),
("jerry", "cat", 1, "white"),
("fred", "parrot", 1, "brown"),
("gus", "fish", 1, "gold"),
("gus", "dog", 11, "black"),
("daisy", "iguana", 2, "green"),
("rufus", "dog", 10, "gold")]
Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:
petsRDD = sc.parallelize(my_previous_pets)
Create a DataFrame from the RDD and a provided schema using the following code:
petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type', 'age', 'color'])
Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:
petsDF.registerTempTable('pets')
You have two options for accomplishing this task: using pure SQL, or taking the programmatic approach.
Option 1: Write a SQL query in Spark SQL that gives us the collective age of each group that is greater than 10, using the following code.
spark.sql("select type, "
"sum(age) as total_age "
"from pets "
"group by type "
"having total_age > 10 "
"order by total_age desc").show()
Option 2: Use the programmatic, function-chaining alternative to Spark SQL to calculate the collective age of each type of animal. Only retain the groups with a collective age over 10.
petsDF.groupBy("type")\
.sum("age")\
.withColumnRenamed("sum(age)", "total_age")\
.where("total_age > 10")\
.orderBy(col("total_age").desc())\
.show()
The following is the output of the preceding code:
+----+---------+
|type|total_age|
+----+---------+
|bear| 29|
| dog| 25|
| cat| 24|
+----+---------+
As expected, only the above three groups pass muster for our condition! They have what it takes!
Coming Up
In the next section, we’ll learn about normalized and de-normalized data, and one way Spark can address navigating them.
Originally published at http://hadoopsters.wordpress.com on February 3, 2022.