Spark Starter Guide 4.8: How to Order and Sort Data

Previous post: Spark Starter Guide 4.7: How to Standardize Data

Introduction

Ordering is useful for when you want to convey… well, order. To be more specific, Ordering (also known as sorting) is most often used in the final analysis or output of your data pipeline, as a way to display data in an organized fashion based on criteria. This results in data that is sorted, and ideally easier to understand.

It looks like this in practice, in both Scala and Python:
myDataFrame.sort("age")

Alternatively, you can use .orderBy(), which is an alias function for .sort() to make it more friendly to SQL aficionados.

Exercise Setup

In the analysis section of Spark Starter Guide 4.6: How to Aggregate Data, we asked these questions: “Who is the youngest cat in the data? Who is the oldest?”

Let’s use ordering in Spark as an alternative method to answer those same questions, and achieve the same result. Specifically, let’s again find the youngest and oldest cats in the data.

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Import spark.implicits, which will be useful for handy operations in a later step using the following code:

import spark.implicits._

Create a Sequence of Rows, each containing an animal name and type using the following code:

val my_previous_pets = Seq(Row("fido", "dog", 4, "brown"),
                               Row("annabelle", "cat", 15, "white"),
                               Row("fred", "bear", 29, "brown"),
                               Row("daisy", "cat", 8, "black"),
                               Row("jerry", "cat", 1, "white"),
                               Row("fred", "parrot", 1, "brown"),
                               Row("gus", "fish", 1, "gold"),
                               Row("gus", "dog", 11, "black"),
                               Row("daisy", "iguana", 2, "green"),
                               Row("rufus", "dog", 10, "gold"))

Create a schema that corresponds to the data using the following code:

val schema = List(
      StructField("nickname", StringType, nullable = true),
      StructField("type", StringType, nullable = true),
      StructField("age", IntegerType, nullable = true),
      StructField("color", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val petsRDD = spark.sparkContext.parallelize(my_previous_pets)

Create a DataFrame from the RDD and schema created using the following code:

val petsDF = spark.createDataFrame(petsRDD, StructType(schema))

Create a temporary table view of the data in Spark SQL called pets using the following code:

petsDF.createOrReplaceTempView("pets")

You have two options for completing the goal:

  1. SQL approach
  2. Functional Approach

Option 1: Write a SQL query that gives us the name and age of the youngest cat. Do the same for the oldest. Print the results to the console using the following code:

spark.sql("select nickname as youngest_cat, min(age) as age from pets where type = 'cat' group by nickname order by age asc limit 1").show()

spark.sql("select nickname as oldest_cat, max(age) as age from pets where type = 'cat' group by nickname order by age desc limit 1").show()

The following is the output of the preceding code:

+------------+---+
|youngest_cat|age|
+------------+---+
|       jerry|  1|
+------------+---+

+----------+---+
|oldest_cat|age|
+----------+---+
| annabelle| 15|
+----------+---+

As we can see, jerry is the youngest cat whose age is 1 while annabelle is the oldest cat whose age is 15.

Option 2: Leverage the function-chaining alternative to accomplish the same thing. Print the results to the console using the following code:


petsDF.where("type = 'cat'").sort("age").limit(1).show()
petsDF.where("type = 'cat'").sort($"age".desc).limit(1).show()

The following is the output of the preceding code:

+--------+----+---+-----+
|nickname|type|age|color|
+--------+----+---+-----+
|   jerry| cat|  1|white|
+--------+----+---+-----+

+---------+----+---+-----+
| nickname|type|age|color|
+---------+----+---+-----+
|annabelle| cat| 15|white|
+---------+----+---+-----+

In addition to the name and the age of the oldest and the youngest cats, we also know that they are both white in color.


Follow these steps to complete the exercise in PYTHON:

Import additional relevant Spark libraries using the following code:

from pyspark.sql.functions import col

Create a List of Rows, each containing a name and type using the following code:

my_previous_pets = [("fido", "dog", 4, "brown"),
                    ("annabelle", "cat", 15, "white"),
                    ("fred", "bear", 29, "brown"),
                    ("daisy", "cat", 8, "black"),
                    ("jerry", "cat", 1, "white"),
                    ("fred", "parrot", 1, "brown"),
                    ("gus", "fish", 1, "gold"),
                    ("gus", "dog", 11, "black"),
                    ("daisy", "iguana", 2, "green"),
                    ("rufus", "dog", 10, "gold")]

Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:

petsRDD = sc.parallelize(my_previous_pets)

Create a DataFrame from the RDD and a provided schema using the following code:

petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type'])

Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:

petsDF.registerTempTable('pets')

You have two options for completing the goal:

  • SQL approach
  • Functional approach

Option 1: Write a SQL query in that gives us the name and age of the youngest cat. Print the results to the console using the following code:

spark.sql("select nickname as youngest_cat, "
          "min(age) as age "
          "from pets "
          "where type = 'cat' "
          "group by nickname "
          "order by age asc "
          "limit 1")\
    .show()

spark.sql("select nickname as oldest_cat, "
          "max(age) as age "
          "from pets "
          "where type = 'cat' "
          "group by nickname "
          "order by age desc "
          "limit 1")\
    .show()

The following is the output of the preceding code:

+------------+---+
|youngest_cat|age|
+------------+---+
|       jerry|  1|
+------------+---+

+----------+---+
|oldest_cat|age|
+----------+---+
| annabelle| 15|
+----------+---+

The output displays the age of the youngest and the oldest cat.

Option 2: Leverage the function-chaining alternative to accomplish the same thing. Print the results to the console using the following code:

petsDF.where("type = 'cat'").sort("age").limit(1).show()
petsDF.where("type = 'cat'").sort(col("age").desc()).limit(1).show()

The following is the output of the preceding code:

+--------+----+---+-----+
|nickname|type|age|color|
+--------+----+---+-----+
|   jerry| cat|  1|white|
+--------+----+---+-----+

+---------+----+---+-----+
| nickname|type|age|color|
+---------+----+---+-----+
|annabelle| cat| 15|white|
+---------+----+---+-----+

The preceding output tells us that the youngest cat is named jerry, is age 1, and is white in color. The oldest cat, annabelle, we see, is 15 years old and white in color as well.

In this exercise, we have learned how to sort our final output such that we can display information in an order that provides insight.

In the next section, we will learn how Ranking is used! Hint: it uses ordering/sorting!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.