Spark Starter Guide 4.5: How to Join DataFrames

Previous post: Spark Starter Guide 4.4: How to Filter Data

Introduction

If you’ve spent any time writing SQL, or Structured Query Language, you might already be familiar with the concept of a JOIN. If you’re not, then the short explanation is that you can use it in SQL to combine two or more data tables together, leveraging a column of data that is shared or related between them.

Joining is handy in a number of ways, like supplementing your large dataset with additional information or performing lookups. There are many types of joins, like left, right, inner and full outer, and Spark has multiple implementations of each to make it convenient and fast for you as an engineer/analyst to leverage. It’s all possible using the join() method.

  • Inner Join: Return records that have matching values in both tables that are being joined together. Rows that do not match on both sides are not kept.
  • Left (Outer) Join: Return all records from the left table, and only the matched records from the right table. This is useful in situations where you need all the rows from the left table, joined with rows from the right table that match. It can be described as supplementing the Left table with Right table information.
  • Right (Outer) Join: Return all records from the right table, and only the matched records from the left table. This is useful in situations where you need all the rows from the right table, joined with rows from the left table that match. It’s the reversed version of the Left (Outer) Join.
  • Full (Outer) Join: Return all records from the left table and the right table, whether they have matching values or not. This is useful in situations where you need like rows to be joined together, but keeping the rows that don’t from both sides.

In the following exercise, we will see how to join two DataFrames.


Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions.col

Create a Sequence of Rows where the content is a tuple containing an animal and its category using the following code:

val categorized_animals = Seq(Row("dog", "pet"),
                              Row("cat", "pet"),
                              Row("bear", "wild"))

Create a schema that corresponds to the data using the following code:

val schema_animals = List(
  StructField("name", StringType, nullable = true),
  StructField("category", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val animalDataRDD = spark.sparkContext.parallelize(categorized_animals)

Create a DataFrame from the RDD and schema created using the following code:

val animalData = spark.createDataFrame(animalDataRDD, StructType(schema_animals))

Create a Sequence of Rows where the content is a tuple containing an animal and its food using the following code:

val animal_foods = Seq(Row("dog", "kibble"),
                       Row("cat", "canned tuna"),
                       Row("bear", "salmon"))

Again, we will create a schema that corresponds to the data from the preceding step using the following code:

val schema_foods = List(
  StructField("animal", StringType, nullable = true),
  StructField("food", StringType, nullable = true)
)

Using the parallelize() function of Spark we will turn that Sequence into an RDD as shown in the following code:

val animalFoodRDD = spark.sparkContext.parallelize(animal_foods)

We will then create a DataFrame from the RDD and schema created using the following code:

val animalFoods = spark.createDataFrame(animalFoodRDD, StructType(schema_foods))

Join one DataFrame to the other on the value they have in common: the animal name. Print the results to the console using the following code:

val animals_enhanced = animalData.join(
      animalFoods, 
      joinExprs = col(colName = "name") === col(colName = "animal"),
      joinType = "left")

animals_enhanced.show()

The following is the output of the preceding code:

+----+--------+------+-----------+
|name|category|animal|       food|
+----+--------+------+-----------+
| dog|     pet|   dog|     kibble|
|bear|    wild|  bear|     salmon|
| cat|     pet|   cat|canned tuna|
+----+--------+------+-----------+

From the preceding table, we can observe rows having common values in the name and animal columns. Based on this, their category and corresponding food is listed.


Follow these steps to complete the exercise in PYTHON:

Create a List of tuples containing an animal and its category using the following code:

categorized_animals = [("dog", "pet"), ("cat", "pet"), ("bear", "wild")]

Create a List of tuples containing an animal and its food using the following code:

animal_foods = [("dog", "kibble"), ("cat", "canned tuna"), ("bear", "salmon")]

Use the parallelize() function of Spark to turn those Lists into RDDs as shown in the following code:

animalDataRDD = sc.parallelize(categorized_animals)
animalFoodRDD = sc.parallelize(animal_foods)

Create DataFrames from the RDDs using the following code:

animalData = spark.createDataFrame(animalDataRDD, ['name', 'category'])
animalFoods = spark.createDataFrame(animalFoodRDD, ['animal', 'food'])

Join one DataFrame to the other on the value they have in common: the animal name. Print the results to the console using the following code:

animals_enhanced = animalData.join(animalFoods, animalData.name == animalFoods.animal)
animals_enhanced.show()

The following is the output of the preceding code:

+----+--------+------+-----------+
|name|category|animal|       food|
+----+--------+------+-----------+
| dog|     pet|   dog|     kibble|
|bear|    wild|  bear|     salmon|
| cat|     pet|   cat|canned tuna|
+----+--------+------+-----------+

This is just one way to join data in Spark. There is another way within the .join() method called the usingColumn approach.

The usingColumn Join Method

If the exercise were a bit different—say, if the join key/column of the left and right data sets had the same column name—we could enact a join slightly differently, but attain the same results. This is called the usingColumn approach, and it’s handy in its straightforwardness.

The following shows the code in SCALA:

leftData.join(rightData, usingColumn = "columnInCommon")

We can try the same thing in PYTHON using the following code:

leftData.join(rightData, on="columnInCommon")

There is an extension of this approach that allows you to involve multiple columns in the join, and all you have to do is provide it a Scala Sequence or Python List of the appropriate columns you wish to join on. If you’re familiar with the JOIN USING clause in SQL, this is effectively that. Keep in mind that each value in the list must exist on both sides of the join for this approach to succeed.

Following shows the code in SCALA:

leftData.join(rightData, usingColumns = Seq("firstColumnInCommon", "secondColumnInCommon"))

We can try the same thing in PYTHON using the following code:

leftData.join(rightData, on=['firstColumnInCommon', 'secondColumnInCommon'])

These methods are by no means the only way to join data, but they are the most common and straightforward.

In the next section, we will learn data aggregation.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.