Introduction

A schema is information about the data contained in a DataFrame. Specifically, the number of columns, column names, column data type, and whether the column can contain NULLs. Without a schema, a DataFrame would be a group of disorganized things. The schema gives the DataFrame structure and meaning.

Without a schema, a DataFrame would be a group of disorganized things. The schema gives the DataFrame structure and meaning.

Specifying the Schema of a DataFrame

In the previous section we introduced the createDataFrame() method. In PySpark, this method looks like:

createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)

After the required data parameter the first optional parameter is schema. The most useful options for the schema parameter include: None (or not included), a list of column names, or a StructType.

If schema is None or left out, then Spark will try to infer the column names and the column types from the data. If schema is a list of column names, then Spark will add the column names in the order specified and will try to infer the column types from the data. In both of these cases Spark uses the number of rows specified in the second optional parameter, samplingRatio, to infer the schema from the data. If not included or given None, then only the top row is used to infer the schema.

To illustrate, say we had some data with a variable named computer_sales with columns “product_code”, “computer_name”, and “sales”. The following illustrates all the options the createDataFrame() method can handle in PySpark.

The following code is used when only the data parameter is provided or the schema is set to None or left blank:

df1 = spark.createDataFrame(computer_sales)
df2 = spark.createDataFrame(computer_sales, None)

Both DataFrames are equivalent.

The following is used when the data parameter is specified along with a Python list of column names:

df3 = spark.createDataFrame(computer_sales, ["product_code", "computer_name", "sales"])

The following code is used when the data parameter is specified along with a Python list of column names and the first two rows will be used to infer the schema:

df4 = spark.createDataFrame(computer_sales, ["product_code", "computer_name", "sales"], 2)

The following is used to infer the schema from every row in the DataFrame. len() is a Python function that returns an integer of the number of values in a list. Since the number of values in the list computer_sales equals the number of rows in the DataFrame, the samplingRatio parameter will evaluate every row in the DataFrame to infer the schema:

df5 = spark.createDataFrame(computer_sales, ["product_code", "computer_name", "sales"], len(computer_sales))

Exercise 6: Creating a DataFrame in PySpark with only named columns

  1. Create a nested list called home_computers as shown in the following code:
    home_computers = [["Honeywell", "Honeywell 316#Kitchen Computer", "DDP 16 Minicomputer", 1969], ["Apple Computer", "Apple II series", "6502", 1977], ["Bally Consumer Products", "Bally Astrocade", "Z80", 1977]]
  2. Create a DataFrame but this time the column names of the DataFrame are given explicitly as a list in the second parameter as shown in the following code:
    computers_df = spark.createDataFrame(home_computers, ["Manufacturer", "Model", "Processor", "Year"])
    Since the third parameter samplingRatio is not included, Spark uses the first row of data to infer the data types of the columns.
  3. Show the contents of the DataFrame and display the schema with the following code:
    computers_df.show()
    computers_df.printSchema()

Running the preceding code displays the following:

+--------------------+--------------------+-------------------+----+
|        Manufacturer|               Model|          Processor|Year|
+--------------------+--------------------+-------------------+----+
|           Honeywell|Honeywell 316#Kit...|DDP 16 Minicomputer|1969|
|      Apple Computer|     Apple II series|               6502|1977|
|Bally Consumer Pr...|     Bally Astrocade|                Z80|1977|
+--------------------+--------------------+-------------------+----+
 
root
 |-- Manufacturer: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Processor: string (nullable = true)
 |-- Year: long (nullable = true)

Columns names make DataFrames exceptionally useful. The PySpark API makes adding columns names to a DataFrame very easy. 

Schemas, StructTypes, and StructFields

The most rigid and defined option for schema is the StructType. It is important to note that the schema of a DataFrame is a StructType. If a DataFrame is created without column names and Spark infers the data types based upon the data, a StructType is still created in the background by Spark. 

A manually created PySpark DataFrame, like the following example, still has a StructType schema:

computers_df = spark.createDataFrame(home_computers)
computers_df.show()
+--------------------+--------------------+-------------------+----+
|        Manufacturer|               Model|          Processor|Year|
+--------------------+--------------------+-------------------+----+
|           Honeywell|Honeywell 316#Kit...|DDP 16 Minicomputer|1969|
|      Apple Computer|     Apple II series|               6502|1977|
|Bally Consumer Pr...|     Bally Astrocade|                Z80|1977|
+--------------------+--------------------+-------------------+----+

The schema can be displayed in PySpark by calling the schema method on a DataFrame like: computers_df.schema

Running the preceding code displays the following:

Out[1]: StructType(List(StructField(_1,StringType,true),StructField(_2,StringType,true),StructField(_3,StringType,true),StructField(_4,LongType,true)))

To recap, the schema of a DataFrame is stored as a StructType object. The StructType object consists of a list of StructFields. The StructFields are the information about the columns of a DataFrame. Use the following code for Spark Scala:

import org.apache.spark.sql.types.{StructType, StructField}
 
val schema = StructType(
  List(
    StructField("Manufacturer", IntegerType, true),
    StructField("Model", StringType, true),
    StructField("Processor", StringType, true),
    StructField("Year", LongType, true)
  )
)

Use the following code for PySpark:

from pyspark.sql.types import StructType, StructField
 
schema = StructType([
  StructField("Manufacturer", StringType(), True),
  StructField("Model", StringType(), True),
  StructField("Processor", StringType(), True),
  StructField("Year", LongType(), True)
])

It is important to note that the schema of a DataFrame is a StructType

StructFields are objects that correspond to each column of the DataFrame and are constructed with the name, data type, and a boolean value of whether the column can contain NULLs. The second parameter of a StructFieldis the columns data type: string, integer, decimal, datetime, and so on. To use data types in Spark the types module must be called. Imports in Scala and Python are code that is not built-in the main module. For example, DataFrames are part of the main code class. But ancillary things like data types and functions are not and must be imported to be used in your file. The following code is the Scala import for all of the data types:

import org.apache.spark.sql.types._

The following code is the Python import for all of the data types:

from pyspark.sql.types import *

Note:

StructType and StructField are actually Spark data types themselves. They are included in the preceding data imports that import all the members of the data types class. To import StructType and StructField individually use the following code for Scala Spark:
import org.apache.spark.sql.types.{StructType, StructField}

To import StructType and StructField individually use the following code for PySpark:
from pyspark.sql.types import StructType, StructField

Exercise 7: Creating a DataFrame in PySpark with a Defined Schema

  1. Import all the PySpark data types at once (that include both StructType and StructField) and make a nested list of data with the following code:
from pyspark.sql.types import *
customer_list = [[111, "Jim", 45.51], [112, "Fred", 87.3], [113, "Jennifer", 313.69], [114, "Lauren", 28.78]]
  1. Construct the schema using the StructType and StructField. First make a StructType which holds a Python list as shown in the following code:
customer_schema = StructType([
  StructField("customer_id", LongType(), True),
  StructField("first_name", StringType(), True),
  StructField("avg_shopping_cart", DoubleType(), True)
])

Inside the list are the individual StructField which make up the columns of the DataFrame.

  1. Make the DataFrame with the following code:
    customer_df = spark.createDataFrame(customer_list, customer_schema)
    In the createDataFrame() method the first parameter is the data and the second is the schema.
  2. Display the contents and the schema of the DataFrame with the following code:
    customer_df.show()
    customer_df.printSchema()

Running the preceding code displays the following:

+-----------+----------+-----------------+
|customer_id|first_name|avg_shopping_cart|
+-----------+----------+-----------------+
|        111|       Jim|            45.51|
|        112|      Fred|             87.3|
|        113|  Jennifer|           313.69|
|        114|    Lauren|            28.78|
+-----------+----------+-----------------+
 
root
 |-- customer_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- avg_shopping_cart: double (nullable = true)

Spark schemas are the structure or the scaffolding of a DataFrame. Just like a building would collapse without structure, so too would a DataFrame. Without structure Spark wouldn’t be able to scale to trillions and trillions of rows. 

Spark schemas are the structure or the scaffolding of a DataFrame

Another way to create a DataFrame in Spark Scala is to use Rows wrapped in a Sequence. We have already introduced the Sequence object. The Row object is one row in a DataFrame with each value representing cells in different columns.

However, a Sequence of Rows requires several things in order to be created into a DataFrame. First a Sequence of Rows must be “parallelized” which means the data that is held in the driver is distributed to all the nodes of the clusters. This is done by calling the parallelize() method on the SparkContext (which is created from the SparkSession). An example would look like spark.sparkContext.parallelize(data). The second thing required to make a DataFrame out of a Sequence of Rows is providing a StructType schema. Unfortunately, not passing a schema or using the toDF() method causes an error.

Exercise 8: Creating a DataFrame in Spark Scala with a Defined Schema

  1. Import the Scala data types and then make a Sequence of Rows called grocery_items. The Seq() wraps around each Row() object. Each comma separated value in the Row() will be a column in the DataFrame:
import org.apache.spark.sql.types._
 
val grocery_items = Seq(
  Row(1, "stuffing", 4.67),
  Row(2, "milk", 3.69),
  Row(3, "rolls", 2.99),
  Row(4, "potatoes", 5.15),
  Row(5, "turkey", 23.99)
)
  1. Construct the schema using the StructType and StructFields. First make a StructType which holds a Scala list. Inside the list are the individual StructFields which make up the columns of the DataFrame
val grocery_schema = StructType(
  List(
    StructField("id", IntegerType, true),
    StructField("item", StringType, true),
    StructField("price", DoubleType, true)
  )
)
  1. Make a DataFrame using the createDataFrame() method. But when using the Sequence of Rows we first have to paralize the data. Inside createDataFrame() call the SparkSession by the “spark” variable then call sparkContext and then finally parallelize followed by the data variable grocery_items. The second parameter is the StructType grocery_schema:
val grocery_df = spark.createDataFrame(spark.sparkContext.parallelize(grocery_items), grocery_schema)

Display the data and print the schema:

grocery_df.show()
grocery_df.printSchema()
+---+--------+-----+
| id|    item|price|
+---+--------+-----+
|  1|stuffing| 4.67|
|  2|    milk| 3.69|
|  3|   rolls| 2.99|
|  4|potatoes| 5.15|
|  5|  turkey|23.99|
+---+--------+-----+
 
root
 |-- id: integer (nullable = true)
 |-- item: string (nullable = true)
 |-- price: double (nullable = true)
 
import org.apache.spark.sql.types._
grocery_items: Seq[org.apache.spark.sql.Row] = List([1,stuffing,4.67], [2,milk,3.69], [3,rols,2.99], [4,potatoes,5.15], [5,turkey,23.99])
grocery_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true))
grocery_df: org.apache.spark.sql.DataFrame = [id: int, item: string ... 1 more field]

The add() Method

The add() method can be used interchangeably and in addition to the StructFields objects on a StructType. The add() method takes the same parameters as the StructField object. The following schemas are all equivalent representations:

The PySpark code is as follows:

from pyspark.sql.types import StructType, StructField, StringType, LongType
 
schema1 = StructType([
  StructField("id_column", LongType(), True),
  StructField("product_desc", StringType(), True)
])
 
schema2 = StructType().add("id_column", LongType(), True).add("product_desc", StringType(), True)
 
schema3 = StructType().add(StructField("id_column", LongType(), True)).add(StructField("product_desc", StringType(), True))

We can confirm that the two schemas are equivalent by comparing if the schema variables are equal to each other and printing the results in Python:

print(schema1 == schema2)
print(schema1 == schema3)

Running the preceding code will display the following:

True
True

In Spark Scala, the add() method is exactly the same except for a blank Scala list must be placed inside the blank StructType. See variables schema2 and schema3 for the only difference between Spark Scala and PySpark:

import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
 
val scala_schema1 = StructType(List(
  StructField("id_column", LongType, true),
  StructField("product_desc", StringType, true)
))
 
val scala_schema2 = StructType(List()).add("id_column", LongType, true).add("product_desc", StringType, true)
 
val scala_schema3 = StructType(List()).add(StructField("id_column", LongType, true)).add(StructField("product_desc", StringType, true))

We can confirm that the two schemas are equivalent by comparing if the schema variables are equal to each other and printing the results in Scala:

println(scala_schema1 == scala_schema2)
println(scala_schema1 == scala_schema3)

Running the preceding code will display the following:

true
true
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
scala_schema1: org.apache.spark.sql.types.StructType = StructType(StructField(id_column,LongType,true), StructField(product_desc,StringType,true))
scala_schema2: org.apache.spark.sql.types.StructType = StructType(StructField(id_column,LongType,true), StructField(product_desc,StringType,true))
scala_schema3: org.apache.spark.sql.types.StructType = StructType(StructField(id_column,LongType,true), StructField(product_desc,StringType,true))

The add() method can be used when adding a new column to already existing DataFrame. In the following example, sales_schema is the schema of a DataFrame. And we want to add another column to the DataFrame. We can use the add() method on a StructField to the original StructType to create a brand new StructType.

Exercise 9: Using the add() Method

  1. Create a schema of a StructType named sales_schema that has two columns. The first column “user_id” is a long data type and cannot be nullable. The second column “product_item” is a string data type and can be nullable. Following is the code for PySpark:
from pyspark.sql.types import StructType, StructField, StringType, LongType
 
sales_schema = StructType([
  StructField("user_id", LongType(), False),
  StructField("product_item", StringType(), True)
])

Following is the code for Scala Spark:

import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
 
val sales_schema = StructType(List(
  StructField("user_id", LongType, true),
  StructField("product_item", StringType, true)
))
  1. Create a StructField called sales_field that has a column name of “total_sales” with a long data type that can be nullable. Following is the code for PySpark:
    sales_field = StructField("total_sales", LongType(), True)
    Following is the code for Scala Spark:
    val sales_field = StructField("total_sales", LongType, true)
  2. Use the add() method to add the sales_field StructField to the sales_schema StructType. Following is the code for PySpark:
    another_schema = sales_schema.add(sales_field)
    Following is the code for Scala Spark:
    val another_schema = sales_schema.add(sales_field)
  3. Print out the new schema variable to verify it looks correctly

Following is the code for PySpark:

print(another_schema)

StructType(List(StructField(user_id,LongType,false),StructField(product_item,StringType,true),StructField(total_sales,LongType,true)))

Following is the code for Scala Spark:

println(another_schema)

StructType(StructField(user_id,LongType,true), StructField(product_item,StringType,true), StructField(total_sales,LongType,true))
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
sales_schema: org.apache.spark.sql.types.StructType = StructType(StructField(user_id,LongType,true), StructField(product_item,StringType,true))
sales_field: org.apache.spark.sql.types.StructField = StructField(total_sales,LongType,true)
another_schema: org.apache.spark.sql.types.StructType = StructType(StructField(user_id,LongType,true), StructField(product_item,StringType,true), StructField(total_sales,LongType,true))

Use the add() method when adding columns to a DataFrame

You can use the schema method on a DataFrame in conjunction with the add() method to add new fields to the schema of an already existing DataFrame. In Spark, a DataFrame’s schema is a StructType. In the preceding exercise we manually specified the schema as StructType. Spark has a shortcut: the schema method. The method schema can be called on an existing DataFrame to return its schema, that is a StructType. So in Spark Scala or PySpark you would call some_df.schema to output the StructType schema.

Here is an example in PySpark of the output of the schema method on a DataFrame returning a StructType schema:

print(customer_df.schema)

StructType(List(StructField(customer_id,LongType,true),StructField(first_name,StringType,true),StructField(avg_shopping_cart,DoubleType,true)))

Here is an example in Spark Scala of the output of the schema method on a DataFrame returning a StructType schema:

println(grocery_df.schema)

StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true))

So, with the schema method you don’t have to manually create the StructType to add a new column. Just call the schema method on the DataFrame and then use the add method to add a column as a StructField. Following is the code for PySpark:

final_schema = customer_df.schema.add(StructField("new_column", StringType(), True))
 
print(final_schema)

Running the preceding code displays the following:

StructType(List(StructField(customer_id,LongType,true),StructField(first_name,StringType,true),StructField(avg_shopping_cart,DoubleType,true),StructField(column_x,StringType,true),StructField(column_x,StringType,true),StructField(new_column,StringType,true),StructField(new_column,StringType,true)))

Following is the code for Scala Spark:

val final_schema = grocery_df.schema.add(StructField("new_column", StringType, true))
 
println(final_schema)

Running the preceding code displays the following:

StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true), StructField(new_column,StringType,true))
final_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true), StructField(new_column,StringType,true))

In the Modifying DataFrames section of this chapter we will see how to add new columns of data to a DataFrame.

Return column names from a schema

Use the fieldNames method on a StructType to return a list or array of the column names. This is an easy way to return the column names of a DataFrame. The fieldNames method can be called on a StructType or after the schema method on a DataFrame. The only difference between Spark Scala and PySpark is that PySpark requires trailing parenthesis () and Spark Scala omits the parenthesis. 

In PySpark call the fieldNames() method on a schema and on the DataFrame to return the column names of the schema:

print( customer_df.schema.fieldNames() )
print( customer_schema.fieldNames() )

The output is a list of the column names of the DataFrame:

['customer_id', 'first_name', 'avg_shopping_cart'] 
['customer_id', 'first_name', 'avg_shopping_cart']

In Spark Scala call the fieldNames() method on a schema and on the DataFrame to return the column names of the schema. In Scala we cannot directly print the contents of an array like we can print a list in Python. So here we create a variable and Scala outputs the contents of the array:

val columns1 = grocery_df.schema.fieldNames
val columns2 = grocery_schema.fieldNames

The output is an array of the column names of the DataFrame:

columns1: Array[String] = Array(id, item, price) 
columns2: Array[String] = Array(id, item, price)

Nested Schemas

So far, we have dealt with flat and orderly DataFrame schema. But Spark supports nested columns where a column can contain more sets of data. 

So far, we have dealt with flat and orderly DataFrame schema. But Spark supports nested columns where a column can contain more sets of data. Suppose we had a data set that looked like the following Python dictionary or JSON object: 

{"id":101,"name":"Jim","orders":[{"id":1,"price":45.99,"userid":101},{"id":2,"price":17.35,"userid":101}]},{"id":102,"name":"Christina","orders":[{"id":3,"price":245.86,"userid":102}]},{"id":103,"name":"Steve","orders":[{"id":4,"price":7.45,"userid":103},{"id":5,"price":8.63,"userid":103}]}

This data set would be the result of some imaginary sales tables that was joined to an orders table. We will look at joining DataFrames together in Chapter 3SQL with Spark.

It is difficult to see from the nested dictionary but there are three columns: idname, and orders. But orders is special, because it is a list of lists. In Python we can directly use this data by wrapping it in brackets as mentioned earlier.

nested_sales_data = [{"id":101,"name":"Jim","orders":[{"id":1,"price":45.99,"userid":101},{"id":2,"price":17.35,"userid":101}]},{"id":102,"name":"Christina","orders":[{"id":3,"price":245.86,"userid":102}]},{"id":103,"name":"Steve","orders":[{"id":4,"price":7.45,"userid":103},{"id":5,"price":8.63,"userid":103}]}]

If we used this list and made a DataFrame without specifying a schema, the output would not be very usable or readable. The following PySpark code uses the preceding nested JSON data to make a Spark DataFrame. The DataFrame and schema is displayed to demonstrate what can happen when you make a DataFrame with nested data without a schema:

ugly_df = spark.createDataFrame(nested_sales_data)
ugly_df.show(20, False)
ugly_df.printSchema()
+---+---------+------------------------------------------------------------------------+
|id |name     |orders                                                                  |
+---+---------+------------------------------------------------------------------------+
|101|Jim      |[[id ->, userid ->, price -> 45.99], [id ->, userid ->, price -> 17.35]]|
|102|Christina|[[id ->, userid ->, price -> 245.86]]                                   |
|103|Steve    |[[id ->, userid ->, price -> 7.45], [id ->, userid ->, price -> 8.63]]  |
+---+---------+------------------------------------------------------------------------+
 
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: double (valueContainsNull = true)

The output is not readable or user friendly with the “->” characters and Spark is trying to make a map of the data. Let’s add a schema to tell Spark exactly how we want to structure the DataFrame. The following PySpark code demonstrates the results of nested data when using a schema:

from pyspark.sql.types import *
 
orders_schema = [
  StructField("id", IntegerType(), True),
  StructField("price", DoubleType(), True),
  StructField("userid", IntegerType(), True)
]
 
sales_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("name", StringType(), True),
  StructField("orders", ArrayType(StructType(orders_schema)), True)
])

Here we called the order_schema inside the sales_schema. This shows how versatile schemas can be and how easy it is in Spark to construct complex schemas. Now let’s make a DataFrame that is readable and well structured as shown in the following code:

nested_df = spark.createDataFrame(nested_sales_data, sales_schema)
 
nested_df.show(20, False)
nested_df.printSchema()
+---+---------+----------------------------------+
|id |name     |orders                            |
+---+---------+----------------------------------+
|101|Jim      |[[1, 45.99, 101], [2, 17.35, 101]]|
|102|Christina|[[3, 245.86, 102]]                |
|103|Steve    |[[4, 7.45, 103], [5, 8.63, 103]]  |
+---+---------+----------------------------------+
 
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: integer (nullable = true)

In the next section we will move on from manually created DataFrames to creating DataFrames from files stored in Hadoop.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.