Introduction

Since the Spark 2.0 version update, DataFrames have been the central technology for accomplishing tasks in Spark. At its essence, DataFrames are an immutable but distributed group of data that is assembled into named columns with a set structure. Let’s look at the aspects of DataFrames individually in the next section.

DataFrames are an immutable but distributed group of data that is assembled into named columns with a set structure

Spark DataFrame Characteristics

Distributed Group of Data

Spark’s principle data storage device is the Hadoop Distributed File System (HDFS). A Hadoop cluster consists of a single NameNode that manages access & metadata of the cluster and multiple DataNodes that store the data. In Hadoop, each data file is split up into many small pieces, replicated at least three times, and stored across the DataNodes. Spark is designed to be able to read and process this distributed data stored in HDFS. So, if data is stored in HDFS and that data is created into a DataFrame, then the data actually is distributed on all the DataNodes of the Hadoop cluster.  

Immutable

Spark DataFrames are immutable which means they do not change. So, adding columns, removing rows, renaming columns, performing calculations, or anything else done to a DataFrame does not actually change the original DataFrame. Instead, each time an operation is performed on a DataFrame an entirely new DataFrame is created, behind the scenes in Spark, with the additional change(s). Even if you performed an operation on a DataFrame and called the DataFrame the same name, like df = df.withColumn(…), in the background Spark creates an entirely new DataFrame with the new column change and then renames the new DataFrame with the same name as the original, df. If an user performs a df.show() after the df = df.withColumn(…), because the Python or Scala code is interpreted in sequential order from top to bottom, the df.show() would display the DataFrame with the added column.

It is very important to note that Spark computes all DataFrame transformations in a lazy fashion. What that means is that computations or operations on a DataFrame actually only happen when a physical action is performed. Examples of an action on a DataFrame include displaying, writing, counting, and collecting on a DataFrame. For example, if we add a calculated column to a DataFrame, the column isn’t actually added until we perform an action on the DataFrame, like displaying the top twenty rows of the DataFrame with df.show(). For each operation performed on a DataFrame, but before an action is called, Spark essentially creates a blueprint or execution plan of the operation. This blueprint would consist of a function that would be the dataset plus the operation. And each time a new operation is performed Spark updates the blueprint with the new changes. Then when an action is called, Spark executes the blueprint and a new DataFrame is created.

Assembled into Named Columns

Spark DataFrames have structure. That structure consists of columns with column names. All data in a DataFrame is organized into named columns: even if there is just one column. For example, if the user creates a DataFrame with no column names, Spark will automatically give each column a name corresponding to its column number: _1, _2, _3, and so on.

Set Structure

Each column’s name and the data type of the column make up its structure, which is called a schema. A DataFrames schema may change but it must be defined. This means columns must have a data type: integer, string, decimal, date, and so on.


To summarize the Spark characteristics – Spark is an action technology. Every operation in Spark is designed to accomplish tasks and help reveal insights on incredibly large data sets. DataFrames form the foundation for everything in Spark: organizing data, analyzing data, streaming data, performing machine learning on data, and much more.  

But before we can do all of that, we first have to create DataFrames. Let’s jump right in.

Creating DataFrames Manually

The main way to create DataFrames in Spark is to use the createDataFrame() method. This method is called on the SparkSession to create DataFrames. Since Spark 2.0, the SparkSession is the main way to interact with all of Sparks’ many capabilities. The SparkSession creates and exposes the SparkConfSparkContext, and SQLContext to the entire application. Normally the SparkSession is created with a variable named “spark” but any name can be used. A basic SparkSession in Spark Scala looks like:

val spark = SparkSession
   .builder()
   .appName("some_app_name")
   .getOrCreate()

A basic SparkSession in PySpark looks like:

spark = SparkSession \
   .builder \
   .appName("some_app_name") \
   .getOrCreate()

Creating DataFrames Manually with PySpark

The createDataFrame() method in PySpark has a structure like:

createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)

The optional parameters, schemasamplingRatio, and verifySchema all deal with a DataFrames schema. A schema is information about the structure of the data. These parameters and more will be covered in the next section Spark DataFrame Schemas. For now, we will only use the first parameter, data.

A schema is information about the structure of the data

The parameter data is a required parameter and can take many things. A DataFrame can be manually created from various kinds of nested lists and tuples. 

In Python, a list is a group of ordered things. Lists can include objects of different data types, are ordered, can be changed, and be nested. A nested list is the easiest way to manually create a DataFrame in PySpark. Each inside list forms a row in the DataFrame.

Let’s start off by showing how to create a DataFrame from a nested Python list.

Exercise 1: Creating a DataFrame in PySpark from a Nested List

In this exercise we will be creating a DataFrame in PySpark from a given set of nested list. 

  1. Create a Python variable hadoop_list that is a nested list with the following code:
    hadoop_list = [[1, "MapReduce"], [2, "YARN"], [3, "Hive"], [4, "Pig"], [5, "Spark"], [6, "Zookeeper"]]
    This will create a DataFrame with six rows and two columns.
  2. To create the DataFrame named hadoop_df we use the SparkSession variable spark (that we created) and call the createDataFrame() method passing only the nested list with the following code:
    hadoop_df = spark.createDataFrame(hadoop_list)
  3. Finally display the contents of the DataFrame using hadoop_df.show() and display the schema of the DataFrame in a tree structure using hadoop_df.printSchema() as shown in the following code:
hadoop_df.show()
hadoop_df.printSchema()

Running the preceding code will produce the following output:

+---+---------+
| _1|       _2|
+---+---------+
|  1|MapReduce|
|  2|     YARN|
|  3|     Hive|
|  4|      Pig|
|  5|    Spark|
|  6|Zookeeper|
+---+---------+
 
root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)

You have now created your first Spark DataFrame. In this exercise the DataFrame only has six rows. But a Spark DataFrame can scale infinitely to contain 100 trillion rows and beyond. This the power of Spark. 

In the output did you notice anything that stood out? There are actually two things to note: 

  1. The column names were _1 and _2. This is because no column names were supplied when creating the DataFrame. Spark didn’t know what to call the columns, so _1 and _2 correspond to its column number going from left to right. 
  2. The output of the printSchema() method correctly inferred the data type of each column. Spark figured out the first column was of data type long, which is similar to an integer, and that the second column was a string. When the printSchema() method is called on a DataFrame the output displays the schema in a tree format. The tree format displays the column hierarchy, column names, column data type, and whether the column is nullable. The printSchema() method has no parameters.

To display the contents of the DataFrame we call the method show() on the newly created DataFrame. The show() method looks like this:

show(n=20, truncate=True)

The show() method defaults to displaying the top twenty rows and also truncates each cell to the first twenty characters. To display more or less than the top twenty rows set the first parameter to any integer. To include all the characters of the cells, set the second parameter to False.

Displaying the contents of Spark DataFrames in PySpark:

address_data = [["Bob", "1348 Central Park Avenue"], ["Nicole", "734 Southwest 46th Street"], ["Jordan", "3786 Ocean City Drive"]]
address_df = spark.createDataFrame(address_data)
address_df.show()
+------+--------------------+
|    _1|                  _2|
+------+--------------------+
|   Bob|1348 Central Park...|
|Nicole|734 Southwest 46t...|
|Jordan|3786 Ocean City D...|
+------+--------------------+

Since the method defaults to the displaying only the first twenty characters of each cell, the content is truncated.

To display all the characters for each cell set the second parameter to False and to limit the output to the first two rows, set the first parameter to 2, as shown in the following code:

address_df.show(2, False)

+------+-------------------------+
|_1    |_2                       |
+------+-------------------------+
|Bob   |1348 Central Park Avenue |
|Nicole|734 Southwest 46th Street|
+------+-------------------------+
only showing top 2 rows

Note:

The second parameter, truncate, can also take integers. If set to an integer, it will display the number of characters equal to the integer for each cell.


In Python, a tuple is similar to a list except it is wrapped in parentheses instead of square brackets and is not changeable (immutable). Other than that, lists and tuples are the same. A nested tuple is a tuple inside another tuple.

Exercise 2: Creating a DataFrame in PySpark from a nested tuple

  1. Create a nested tuple called programming_languages with the following code:
    programming_languages = ((1, "Java", "Scalable"), (2, "C", "Portable"), (3, "Python", "Big Data, ML, AI, Robotics"), (4, "JavaScript", "Web Browsers"), (5, "Ruby", "Web Apps"))
  2. Construct a DataFrame called prog_lang_df with the following code:
    prog_lang_df = spark.createDataFrame(programming_languages)
  3. Display the five rows and set the truncate parameter to False so the entire contents of the cells will be shown. Also print the schema of the DataFrame with the following code:
prog_lang_df.show(5, False)
prog_lang_df.printSchema()
+---+----------+--------------------------+
|_1 |_2        |_3                        |
+---+----------+--------------------------+
|1  |Java      |Scalable                  |
|2  |C         |Portable                  |
|3  |Python    |Big Data, ML, AI, Robotics|
|4  |JavaScript|Web Browsers              |
|5  |Ruby      |Web Apps                  |
+---+----------+--------------------------+
 
root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)

In Python, a dictionary is a key-value pair wrapped in curly braces. A dictionary is similar to a list, in that it is mutable, can increase or decrease in size, and be nested. Each data element in a dictionary has a key and a value. To create a DataFrame out of a dictionary all that is required is to wrap it in a list. 

Exercise 3: Creating a DataFrame in PySpark from a list of dictionaries

  1. Create a list of dictionaries called top_mobile_phones. Inside the list make three comma separated dictionaries each with keys of “Manufacturer”, “Model”, “Year”, “Million_Units” as shown in the following code:
    top_mobile_phones = [{"Manufacturer": "Nokia", "Model": "1100", "Year": 2003, "Million_Units": 250}, {"Manufacturer": "Nokia", "Model": "1110", "Year": 2005, "Million_Units": 250}, {"Manufacturer": "Apple", "Model": "iPhone 6 & 6+", "Year": 2014, "Million_Units": 222}]
  2. Create a DataFrame called mobile_phones_df from the dictionary list as shown in the following code:
    mobile_phones_df = spark.createDataFrame(top_mobile_phones)
  3. Display the DataFrame and print the schema as shown in the following code:
mobile_phones_df.show()
mobile_phones_df.printSchema()
+------------+-------------+-------------+----+
|Manufacturer|Million_Units|        Model|Year|
+------------+-------------+-------------+----+
|       Nokia|          250|         1100|2003|
|       Nokia|          250|         1110|2005|
|       Apple|          222|iPhone 6 & 6+|2014|
+------------+-------------+-------------+----+
 
root
 |-- Manufacturer: string (nullable = true)
 |-- Million_Units: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: long (nullable = true)

Notice that we didn’t supply the column names to the DataFrame but they still appear. That is because dictionaries have “keys” and these keys make up the columns of the DataFrame. Likewise, the dictionary “values” are the cells in the DataFrame. So, by using dictionaries, Spark can display the DataFrame column names.

Creating DataFrames Manually with Spark Scala

In Scala the createDataFrame() method only has two parameters, the first is the actual data and the second is schema. Two of the more popular options for creating a DataFrame manually in Scala are using lists and Sequences. A list in Scala is different than a list in Python. In Scala, a list is an unchangeable collection of objects of the same data type. But we can get around Scala’s list immutability by holding each row of data in a tuple. In Scala, a tuple is just comma separated data enclosed by parenthesis. In the following exercise, the DataFrame will have two columns because the tuple only has two values. But using this method we can add more columns by adding more values inside the tuple.

Exercise 4: Creating a DataFrame in Scala from a List

  1. Before we can create a DataFrame, we first need a list. In Scala, we can use the List() method to create a list. Create a list called reptile_species_state with three tuples of two vales each inside the list as shown in the following code:
    val reptile_species_state = List(("Arizona", 97), ("Florida", 103), ("Texas", 139))
  2. Create a DataFrame named reptile_df by calling the createDataFrame() method on the SparkSession variable spark. Pass the reptile_species_state list as a parameter to the createDataFrame() method as shown in the following code:
    val reptile_df = spark.createDataFrame(reptile_species_state)
  3. Display the DataFrame and print the schema with the following code:
reptile_df.show()
reptile_df.printSchema
+-------+---+
|     _1| _2|
+-------+---+
|Arizona| 97|
|Florida|103|
|  Texas|139|
+-------+---+
 
root
 |-- _1: string (nullable = true)
 |-- _2: integer (nullable = false)
 
reptile_species_state: List[(String, Int)] = List((Arizona,97), (Florida,103), (Texas,139))
reptile_df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

As with Python, the createDataFrame() method in Scala correctly inferred the schema and created the DataFrame.

In Scala, a Sequence and a list are very similar. In fact, a list is a type of Sequence. The main difference centers around performance: how Scala provides fast access to the elements in a list or Sequence. But in reality you can treat them the same. A Sequence is called like Seq() with the elements inside the parenthesis separated by commas.

To add column names to a manually created DataFrame in Scala use the toDF() method. toDF() takes comma separated strings and converts the DataFrame with unnamed columns into a DataFrame with named columns. 

Exercise 5: Creating a DataFrame in Scala from a Sequence

  1. In Scala, a Sequence is created using “Seq()”. Make a Sequence called bird_species_state and inside construct three tuples as shown in the following code:
    val bird_species_state = Seq(("Alaska", 506), ("California", 683), ("Colorado", 496))
  2. Create a DataFrame called birds_df. But this time call the toDF() method with comma separated strings for column names as shown in the following code:
    val birds_df = spark.createDataFrame(bird_species_state).toDF("state","bird_species")
  3. Display the contents of the DataFrame and print the schema with the following code:
birds_df.show()
birds_df.printSchema
+----------+------------+
|     state|bird_species|
+----------+------------+
|    Alaska|         506|
|California|         683|
|  Colorado|         496|
+----------+------------+
 
root
 |-- state: string (nullable = true)
 |-- bird_species: integer (nullable = false)
 
bird_species_state: Seq[(String, Int)] = List((Alaska,506), (California,683), (Colorado,496))
birds_df: org.apache.spark.sql.DataFrame = [state: string, bird_species: int]

Like Python dictionaries, Scala Sequences are prevalent and are used often. Being exposed to them will add a lot of value. In this exercise we showed how to transform data in a Sequence into a DataFrame. 

In this section we showed how to create DataFrame with just data. We will explain creating DataFrames in Scala and Python with a defined schema in the next section.


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.