Introduction

Now that we have created DataFrames manually and from files, it is time to actually do things with them. This section introduces the fundamental operations on DataFrames: displaying, selecting, filtering, and sorting. As discussed in the Section 1.1, performing operations on a DataFrame actually creates a new DataFrame behind the scenes in Spark, regardless if a new name is given to the DataFrame or the results are displayed. When working with DataFrames the results can be displayed (as will be covered shortly), the results can be saved to a DataFrame with the same name as the original, or the results can be saved to a different DataFrame name.

To illustrate, a DataFrame is created with the name some_data_frame as shown in the following code:

some_data_frame = spark.createDataFrame(…)

An operation can be performed on the some_data_frame and the resulting DataFrame saved with the original name as shown in the following code:

some_data_frame = some_data_frame.filter(…)

Or after performing an operation the DataFrame saved under a different variable name as shown in the following code:

a_new_dataframe_variable = some_data_frame.filter(…)

Note:

This section uses a PySpark and Spark Scala DataFrame called df created from the Parquet files found in at the GitHub repository https://github.com/Teradata/kylo/tree/master/samples/sample-data. In Section 1.3 we demonstrated copying this data to HDFS.

df = spark.read.parquet("hdfs://user/your_user_name/sample-data/parquet/userdata1.parquet")

Displaying DataFrames

We have already seen the show() method on a DataFrame to display the top rows of a DataFrame. There are two other methods to display the contents of a DataFrame called head() and take(). Both do the exact same thing and return the top n rows of a DataFrame as a list of Row objects. The output is a list in PySpark and an array in Scala. Each row of the DataFrame is a Row() inside the list or array. The default value is one.

In Scala or PySpark use either head() or take() on a DataFrame. The code is the same in Scala or PySpark

df.head(2)

or

df.take(2)

The following shows the PySpark output:

[Row(registration_dttm=datetime.datetime(2016, 2, 3, 7, 55, 29), id=1, first_name='Amanda', last_name='Jordan', email='ajordan0@com.com', gender='Female', ip_address='1.197.201.2', cc='6759521864920116', country='Indonesia', birthdate='3/8/1971', salary=49756.53, title='Internal Auditor', comments='1E+02'),
 Row(registration_dttm=datetime.datetime(2016, 2, 3, 17, 4, 3), id=2, first_name='Albert', last_name='Freeman', email='afreeman1@is.gd', gender='Male', ip_address='218.111.175.34', cc='', country='Canada', birthdate='1/16/1968', salary=150280.17, title='Accountant IV', comments='')]

The following shows the Spark Scala output:

Res1: Array[org.apache.spark.sql.Row] = Array([2016-02-03 07:55:29.0,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116,Indonesia,3/8/1971,49756.53,Internal Auditor,1E+02], [2016-02-03 17:04:03.0,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,]) 

The output returns Row objects for each row. In PySpark each row delineates the column name then an equal sign followed by each value. All of the Row objects are wrapped in a Python list. In Spark Scala, the Row objects just list the values of each row and all of the Row objects are wrapped in a Scala array.

Selecting Columns of a DataFrame

In Spark users can choose or select individual columns of a DataFrame in multiple different ways. Columns can be chosen by column name or column index in both Scala and PySpark. Scala has an unique technique called the $-notation of choosing columns and PySpark has an unique way to choose by column attribute. The techniques in this section will be demonstrated using the select() method but choosing columns can be used with all other Spark methods. 

Selecting columns using the select() method is a way to reduce the DataFrame in size by columns. If we select columns then we are reducing the DataFrame to just the columns we specified. After selecting we can display the results, save the new DataFrame under a new variable name, or the same name.

Access by Column Name

To select a column from a DataFrame use the select() method along with the column or columns in quotes. To choose more than one column, separate the quoted column names by commas. You can select columns in any order, which means you can easily rearrange columns of a DataFrame.

In the following PySpark and Spark Scala code we select the "first_name" and "last_name" columns and display the first five rows with the following code:

df.select("first_name", "last_name").show(5)

Running the preceding code displays the following:

+----------+---------+
|first_name|last_name|
+----------+---------+
|    Amanda|   Jordan|
|    Albert|  Freeman|
|    Evelyn|   Morgan|
|    Denise|    Riley|
|    Carlos|    Burns|
+----------+---------+
only showing top 5 rows

The following PySpark and Spark Scala code demonstrates that we can select the columns in any order that we would like. This is a great way to rearrange the columns in a DataFrame:

df.select("last_name", "first_name").show(5)

Running the preceding code displays the following:

+---------+----------+
|last_name|first_name|
+---------+----------+
|   Jordan|    Amanda|
|  Freeman|    Albert|
|   Morgan|    Evelyn|
|    Riley|    Denise|
|    Burns|    Carlos|
+---------+----------+
only showing top 5 rows

You can also select all the columns of a DataFrame by providing the "*" to the select() method. This would be the same as executing df.show(). Most people don’t know the select() method has this ability.

The following PySpark and Spark Scala code selects every column in the DataFrame:

df.select("*").show()

Access by column index

In PySpark and Spark Scala the preferred technique to access columns is by index. It is backwards compatible with previous versions, accommodates column names with spaces, and won’t break if the column name is an attribute of any of Spark’s classes.

In PySpark, accessing columns by index uses a square bracket notation. The following code selects three columns from the DataFrame by column index:

df.select(df['first_name'], df['country'], df['salary']).show()
df.select(df("first_name"), df("country"), df("salary").show()

PySpark Access by Column Attribute

PySpark has the ability to access a DataFrame column by attribute, which uses a dot notation. Here is a basic example.

df.select(df.id, df.first_name, df.last_name).show()

The dot notation makes data exploration very easy and straightforward. However, if the column names have spaces or are attributes on any of Spark classes then this operation will throw errors.

Note:

In PySpark, you can mix and match the dot notation and the square bracket notation!

df.select(df['first_name'], df.country, df.salary).show()

Scala $-notation

Scala has an unique technique for selecting columns using the dollar sign and is aptly called the $-notation. The $-notation uses the dollar sign and the column name in double quotes to access a column. But this notation requires importing the class spark.implicits._

Here is a basic example of selecting three columns using the $-notation in Scala:



df.select($"id", $"gender", $"birthdate").show()

This technique is quick and easy and since it uses double quotes, column names with spaces won’t throw errors.

Note:

In Spark Scala, you can mix and match accessing by index and the $-notation!

import spark.implicits._

df.select(df("first_name"), $"country", $"salary").show()

Another interesting thing to do with the select() method is to return the unique rows of a particular column by using the distinct method. Simply call the distinct method after selecting one column to return a DataFrame of the unique values.

Exercise 12: Finding Distinct Values

  1. To return the unique values (including spaces, numbers, special characters, and nulls) use the following PySpark or Spark Scala code:
df.select("gender").distinct().show()
+------+
|gender|
+------+
|Female|
|  Male|
|      |
+------+
  1. In PySpark or Scala select the “country” column and display the first five unique country values with the following code:
df.select("country").distinct().show(5)
+-------------------+
|            country|
+-------------------+
|               Chad|
|             Russia|
|           Paraguay|
|              Yemen|
|U.S. Virgin Islands|
+-------------------+
only showing top 5 rows

Filtering Rows of a DataFrame

In Spark, filtering is reducing the rows of a DataFrames to the subset of rows that meet a single or multiple conditions. Filtering uses the filter() method on a DataFrame. The condition could consist of inequalities (greater than, less than, and so on), equal to strings or numbers, or Boolean (true/false) expressions. The condition of filter() would be in the form of: column condition expression_value

In PySpark, choose the "salary" by column attribute and filter the DataFrame to all the rows with a salary above the integer 50,000. Display the first four results:

df.filter(df.salary > 50000).show(4)

We can also combine methods. If we wanted the number of salaries over 50,000 we can use the count() method to return the number of rows as an integer. 

df.filter(df["salary"] > 50000).count()

The count() function counts the number of rows in a DataFrame and returns the integer. The result of the command above would be a single value.

In Scala, use the $-notations to return the number of people with a first name equal to “Albert”

import spark.implicits._

df.filter($"first_name" === "Albert").count()

In PySpark, this would look like:

df.filter(df["first_name"] == "Albert").count()

Filtering is a vital operation when using DataFrames. And it is easy to add more filters as we will see in the next exercise.

Spark allows chaining multiple filtering conditions together. This would filter the DataFrame by condition A and then filter the results of condition A by condition B. As long as there is data to filter, more filter() methods can be chained together.

In addition to chaining the filter() method, filtering can also be applied with the andor symbols. An and is equivalent to the previously done chaining, filter by condition A and then filter the results of condition A by condition B and so on. An or would be returning the rows that meet at least one of the conditions. In general, an or should return more results than an and.

In Python, and is represented by a single “&” and or is represented by a single pipe “|“. And in Scala, and is represented by a double “&&” and or is represented by a double pipe “||“.

Exercise 13: Filtering with Multiple Conditions

  1. In PySpark, filter all the salaries greater than 10,000 that are also female. Show an example by choosing columns by index and attribute with the following code:
df.filter(df["salary"] > 10000).filter(df["gender"] == "Female").show()

df.filter(df.salary > 10000).filter(df.gender == "Female").show()
  1. In Scala, do the same thing but with only the $-notation as shown in the following code:
import spark.implicits._

df.filter($"salary" > 10000).filter($"gender" === "Female").show()
  1. In PySpark, use the and and or symbols to find the number of salaries greater than 150,000 and/or a gender of male with the following code:
df.filter((df.salary > 150000) & (df.gender == "Male")).count()

df.filter((df["salary"] > 150000) | (df["gender"] == "Female")).count()

Running the preceding code displays the following:

Out[1]: 1216
Out[2]: 3607
  1. In Spark Scala use the and and or symbols to filter salaries greater than 150,000 and/or a country of the United States with the following code:
df.filter($"salary" > 150000 && $"country" === "United States").count()

df.filter($"salary" > 150000 || $"country" === "United States").count()

Running the preceding code displays the following:

res1: Long = 48
res2: Long = 2439

In addition to the filter() method Spark has a where() method. The filter() and where() methods are identical in form and function. The where() method would give more of a SQL feel which we will explore in chapter 3, SQL with Spark.

Sorting Rows of a DataFrame

Sorting a DataFrame is choosing a column or columns and ordering all the rows of the DataFrame by the values in that column either in ascending or descending order. If a column is a numerical column then the ordering is either from largest to smallest or largest to smallest. If a column has a string data type then the rows are ordered in alphabetical or reverser alphabetical order. However, with string columns sorting in alphabetical order would first start with blank values, then numerical values from zero to nine, and then finally letters.

In Spark, to sort a DataFrame use the orderBy() method with parameters of column names. In addition, append the method asc for ascending order and desc for descending order to each column name.

A DataFrame can be sorted with multiple columns by adding comma separated column names to the orderBy()method.

In PySpark, sort the DataFrame by the “birthdate” column in descending order by column attribute. Then sort the DataFrame by the “first_name” column in ascending order by column index as shown in the following code:

df.orderBy(df.birthdate.desc()).show()
df.orderBy(df["first_name"].asc()).show()

In Spark Scala, repeat the preceding steps but use the $-notation and column index as shown in the following code:

df.orderBy($"birthdate".desc).show()
df.orderBy(df("birthdate").asc).show()

In PySpark, sort the DataFrame by “last_name” in descending order and also by “country” in ascending order. Then show the results of a DataFrame sorted by “title” in ascending order and then by “last_name” in descending order with the following code:

df.orderBy(df.last_name.desc(), df.country.asc()).show()

df.orderBy(df["title"].asc(), df["last_name"].desc()).show()

In Spark Scala, sort the DataFrame by “birthdate” and “country” both in descending order. Show the same command but only using the $-notation as shown in the following code:

Putting It All Together

In addition to supplying one operation at a time to a Spark DataFrame, we can also combine operations together. To accomplish this, follow these steps:

  1. Press the Return key after the first operation.
  2. On the new line press the tab button once to indent the code.
  3. Start another operation on the DataFrame with the “dot”.

We are essentially chaining the different methods together.

Exercise 14: Combining Spark Methods

  1. In Spark Scala we can combine selectfilter, and orderBy as shown in the following code:
df.select("id", "first_name", "last_name", "gender", "country", "birthdate", "salary")
  .filter(df("country") === "United States")
  .orderBy(df("gender").asc, df("salary").desc)
  .show()

Running the preceding code displays the following:

+---+----------+---------+------+-------------+----------+---------+
| id|first_name|last_name|gender|      country| birthdate|   salary|
+---+----------+---------+------+-------------+----------+---------+
|944|     Kelly|   Hanson|      |United States|  1/2/1969|     null|
|500|     Henry|Rodriguez|      |United States| 6/23/1995|     null|
|715|    Amanda|  Simmons|      |United States| 4/29/1971|     null|
|409|      Todd| Ferguson|      |United States|  4/8/1981|     null|
| 25|     Susan|   Turner|      |United States|          |     null|
|646| Elizabeth|  Wallace|      |United States| 3/26/1986|     null|
|243|    Thomas|  Bradley|      |United States| 7/30/1965|     null|
|931|    Amanda|  Ramirez|Female|United States|  5/4/1989|254948.72|
| 29|Jacqueline|   Holmes|Female|United States|          |247939.52|
|458|   Carolyn|  Jackson|Female|United States| 8/16/1992|236557.48|
|159|   Frances|      Cox|Female|United States| 8/27/1976|217993.12|
|748|  Virginia|   Tucker|Female|United States| 1/11/1988|204624.68|
|674|  Jennifer|    James|Female|United States|          |204551.91|
|721|     Julie|Fernandez|Female|United States| 5/25/1995|201968.29|
|875|   Shirley|  Vasquez|Female|United States| 6/10/1981| 201672.0|
|687|    Bonnie|   Weaver|Female|United States| 9/27/1985|200523.37|
|815|  Michelle|  Schmidt|Female|United States| 1/28/1990|199331.05|
|663|   Theresa|  Stevens|Female|United States|10/14/1956| 191439.2|
|422|     Diana| Crawford|Female|United States|          |185372.93|
|466|     Norma|Alexander|Female|United States|  1/8/1972|185213.38|
+---+----------+---------+------+-------------+----------+---------+
only showing top 20 rows

  1. In Python, chained methods work differently. The combined operation must be wrapped in parenthesis or each individual line has to be backslash escaped. The following code shows the parenthesis wrapped combined operation:
(df.select("id", "first_name", "last_name", "gender", "country", "birthdate", "salary")
  .filter(df["country"] == "United States")
  .orderBy(df["gender"].asc(), df["salary"].asc())
  .show())

The following code shows the backslash escaped combined operation:

df.select("id", "first_name", "last_name", "gender", "country", "birthdate", "salary") \
  .filter(df["country"] == "United States") \
  .orderBy(df["gender"].asc(), df["salary"].asc()) \
  .show()

This exercise exposes the reader to the heart of Spark DataFrame operations: combing multiple commands into one powerful yet simple command to transform the DataFrame.


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.