Introduction

When working with Spark in a production setting, most likely the data will come from files stored in HDFS on a Hadoop cluster. Spark has built-in support for a lot of different file types. In this section we will cover five of the most popular data file types you will see when working with Spark in a production setting: Parquet, ORC, JSON, Avro, and CSV. Some of these file types were created recently for the Big Data world and others have been around for a while and have uses outside of Hadoop. They represent a good mix of file data types and the capabilities of Spark when dealing with files.

The remainder of chapter 1 uses an open source sample data set originally found at https://github.com/Teradata/kylo/tree/master/samples/sample-data. It contains folders with data files of each of the file types covered in this chapter. All of the folders were copied to HDFS with a path of /user/your_user_name/data. No data was changed from the original version.

Let’s see the following Mac example:

Clone the kylo GitHub open-source repository to a location on your local computer with the following code:

$ git clone https://github.com/Teradata/kylo.git

Upload the sample-data directory to your Hadoop edge node by using the scp command as shown in the following code:

$ scp -rp /Users/your_directory/…/kylo/samples/sample-data user_name@cluster_ip_address:/home/your_user_name

Once on the edge node, use the Hadoop command -put to upload the directory of data from the edge node to a directory in HDFS as shown in the following code:

$ hadoop fs -put /home/your_user_name/sample-data /user/your_user_name/data

Creating DataFrames from Parquet Files

Parquet is an open-source data file format. Just like Comma Separated Value (CSV) is a type of file format, Parquet is a file format. However, Parquet was built from the ground up to be very fast, efficient, and flexible. Parquet was originally designed for the Hadoop ecosystem, so it fits perfectly with Spark. Many traditional file formats like CSV are row-based. Meaning all the rows will have to be read for each query or operation. This is costly and time consuming. Instead Parquet has a columnar format. Which means it organizes data into columns instead of rows. Querying by column means reads and operations can skip over entire columns that aren’t needed. This makes queries very fast. 

Parquet also is unique because the schema of the data is held within the files. This means Spark doesn’t have to infer or guess the schema, it can create DataFrames with the correct column names and data types. Lastly, Parquet was built with advanced compression technology that physically compresses the data for the purpose of taking up less space. For example, one TB of CSV files would only be a couple hundred GBs as Parquet files. This makes storage costs cheaper because there is less data to hold.

Creating DataFrames from Parquet files is straightforward, since Parquet files already contain the schema. There are two main ways of creating DataFrames from Parquet files and the commands are the same in both PySpark and Spark Scala. Both options use the SparkSession variable, spark, as the starting point and then call the read method. The read method is used to read data from files and create DataFrames. The first option calls the parquet method that is purposely designed to read parquet files. The parquet method has one parameter and that is the HDFS path to the parquet file. The second option first calls the format method that accepts a string parameter of the type of files to be read. In this case “parquet”. Then it calls the load method that is the HDFS path to files to be loaded. 

Note:

In this chapter many of the Python and Scala Spark commands are identical. Since Scala uses the val keyword when creating variables and Python simply needs the variable name, this section will use “... =” to designate that the command can be written in either Python or Scala to save space. The only difference being Scala uses “val some_name =” while Python uses “some_name =“.

Exercise 10: Creating DataFrames from Parquet Files

  1. To create a DataFrame using the parquet() method use the SparkSession variable and call the read method followed by the parquet() method. Then pass the full HDFS file path of the parquet file you would like to convert into a DataFrame with the following code in both PySpark and Spark Scala:
… = spark.read.parquet("hdfs://user/your_user_name/data/userdata1.parquet")
  1. To create a DataFrame using the load() method, use the SparkSession variable and call the read method followed by the format() method that takes one parameter which is the file type of the file to be converted into a DataFrame. In this case use “parquet“. Then call the load() method with the full HDFS file path to the parquet file as shown in the following code:
… = spark.read.format("parquet").load("hdfs://user/your_user_name/data/userdata1.parquet")
  1. If we call .show(5) on any of the newly created DataFrames it will output the first five rows of the DataFrame. The data is made-up data that mimics users logging into a computer system. The output of the DataFrame would look like:
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|  registration_dttm| id|first_name|last_name|               email|gender|    ip_address|              cc|     country|birthdate|   salary|               title|comments|
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|   1.197.201.2|6759521864920116|   Indonesia| 3/8/1971| 49756.53|    Internal Auditor|   1E+02|
|2016-02-03 17:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male|218.111.175.34|                |      Canada|1/16/1968|150280.17|       Accountant IV|        |
|2016-02-03 01:09:31|  3|    Evelyn|   Morgan|emorgan2@altervis...|Female|  7.161.136.94|6767119071901597|      Russia| 2/1/1960|144972.51| Structural Engineer|        |
|2016-02-03 00:36:21|  4|    Denise|    Riley|    driley3@gmpg.org|Female| 140.35.109.83|3576031598965625|       China| 4/8/1997| 90263.05|Senior Cost Accou...|        |
|2016-02-03 05:05:31|  5|    Carlos|    Burns|cburns4@miitbeian...|      |169.113.235.40|5602256255204850|South Africa|         |     null|                    |        |
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
only showing top 5 rows

Creating DataFrames from ORC Files

The Optimized Row Columnar (ORC) is a file format that was designed with two main goals: increase data processing speeds and reducing file sizes. ORC is another open-source project built for the Hadoop ecosystem, specifically Hive. Like Parquet, ORC is a columnar file format. But ORC stores data first into stripes which are groups of row data. Within a stripe the data is stored in columns. Also, each stripe has a footer that holds metadata about the columns in the stripe, including each column’s data type, count, min, max, and sum. The ORC architecture is relatively complicated to describe and wrap your mind around and there is much more to it than this brief introduction. But the main point of ORC is that it is meant to be fast and it is. Also, ORC files have great file compression just like Parquet.

Source: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-FileStructure

Note

ORC files don’t have file extensions.

In PySpark and Spark Scala a DataFrame is created from ORC files by calling the orc() method or the .format("orc").load() methods.

Following are examples of creating DataFrames from ORC files using the orc() method and the load() method. The code is the same between PySpark and Spark Scala:

… = spark.read.orc("hdfs://user/your_user_name/data/orc/userdata1_orc")
 
… = spark.read.format("orc").load("hdfs://user/your_user_name/data/orc/userdata1_orc")

All of the previous examples have created DataFrames from a single file. But Spark can easily create a single DataFrame from multiple files. Instead of providing a single file in the path, all that is required is to end the path on a directory that only contains files of single file format and schema. The following example is shown for ORC files but works for all the other file formats.

In the following example a single file is not specified but a directory containing only files of the same format. Also, notice the subtle difference between the two examples. In the HDFS path, the first example ends in only the directory name. In the second example, the HDFS path has a trailing slash. Both of these produce the exact same DataFrame. It doesn’t matter whether the trailing slash is present. 

… = spark.read.orc("hdfs://user/your_user_name/data/orc")
 
… = spark.read.format("orc").load("hdfs://user/your_user_name/data/orc/")

Creating DataFrames from JSON

JSON, or JavaScript Object Notation is a file format written in human-readable text in the form of key-value pairs. In JavaScript, an object looks like:

var myObj = {name: "Craig", age: 34, state: "Arkansas"};

Suppose we had a JSON file, some_file.json, that had data equal to the preceding JavaScript object. The contents of that file would be  {name: "Craig", age: 33, state: "Arkansas"}. The structure of JSON makes it very easy for humans to read, understand, and even write. But it turns out this JSON structure is very easy for computers to read and parse. Which makes JSON the go-to format for exchanging data between applications and servers. 

The preceding JSON example is a very simple example but JSON supports very complex nested structures including objects, arrays, and many different data types. This capability makes it extremely flexible at holding any type of data. Because of JSON’s flexible structure and ease of use for both humans and computers JSON is very popular. 

Spark has the ability to automatically infer the schema from JSON data. This makes it very easy to use JSON data in Spark. Also, the json() method has almost twenty optional parameters it accommodates. Everything from error handling to leading zero options to data type configurations. This chapter won’t cover all of the available paramters. But the link below to the official documentation outlines all the paramters.

Following is the code for both PySpark and Spark Scala:

… = spark.read.json("hdfs://user/your_user_name/data/books1.json")
 
… = spark.read.format("json").load("hdfs://user/your_user_name/data/books1.json")

See the official documentation links for PySpark (https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=json#pyspark.sql.DataFrameReader.json) and Scala (https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@json(paths:String*):org.apache.spark.sql.DataFrame) to view the complete list optional parameters.

Creating DataFrames from Avro Files

Avro file format is another open-source technology designed for the Hadoop world. But unlike Parquet and ORC, Avro is a row-based file format. The row-based architecture and binary data format of Avro make it ideal for write-heavy operations. If your use-case is primarily focused on writing data then Avro would be a good choice. File formats like Parquet and ORC are primarily designed for reading data. 

Another major advantage of Avro is its schema. In fact, Avro stores its schema in JSON. And because Avro’s schema is JSON, the schema fields can be added or subtracted overtime. If we had an Avro file with three columns, “name”, “age”, and “state”, the corresponding JSON schema code would look like the following example. The JSON clearly delineates the table name, all of the column names and their data types, plus other metadata about how the record was generated as shown in the following code:

    {
      "type" : "record",
      "name" : "json example",
      "doc" : "Schema generated"
      "fields" : [ {
        "name" : "name",
        "type" : [ "null", "string" ],
        "default" : null,
        "columnName" : "name",
        "sqlType" : "12"
      }, {
        "name" : "age",
        "type" : [ "null", "long" ],
        "default" : null,
        "columnName" : "age",
        "sqlType" : "93"
      }, {
        "name" : "state",
        "type" : [ "null", "string" ],
        "default" : null,
        "columnName" : "state",
        "sqlType" : "2"
      } ],
      "tableName" : "employees"
    }

In Spark, there is not an avro() method. So the only way in PySpark and Spark Scala to create a DataFrame from Avro files is to use the load() method. See the following example.

Following is the code for both PySpark and Spark Scala:

… = spark.read.format("avro").load("hdfs://user/your_user_name/data/userdata1.avro")

In addition to HDFS paths that include single files and directories, Spark also supports wildcards and collections of files & directories. Let’s explore these options using Avro files as an example. 

Wildcards

Spark supports Regular Expressions (RegEx) in the HDFS path. The two most beneficial wildcards are the * character which matches on zero or more characters and the ? character which matches on a single character. In the preceding Avro example, let’s say in the data directory were a mix of Avro files and other files formats. But we only wanted to create a DataFrame of the Avro files. The following example searches for all files in the data directory that end in “.avro”:

… = spark.read.format("avro").load("hdfs://user/your_user_name/data/*.avro")

Collections

Let’s say the data directory consisted of ten files from userdata0.avro through userdata9.avro. And for some reason we wanted to create a DataFrame from a certain subset of the files. We could provide a comma separated collection of the full HDFS paths to each file. For example:

… = spark.read.format("avro").load("hdfs://user/your_user_name/data/userdata1.avro,hdfs://user/data/userdata5.avro,hdfs://user/your_user_name/data/userdata8.avro")

The result would be one DataFrame with data from the three Avro files. This collection of paths will also work on any collection of files and directories. This assumes that all the files have the same schema. 

Creating DataFrames from CSV Files

Comma Separated Value (CSV) is a row-based human readable text format with the data values separated by commas. It is one of the simplest file formats. Because it is plain text the file will take up more space than the other file formats used with Spark. But it is simple to read and write and most text editors can open CSV files.

Creating a DataFrame from CSV files requires several parameters. The most important parameters include the file separator defined as sep, the inferSchema parameter, and the header parameter which specifies if the CSV file(s) have a header row of column names. In Spark, there is a major difference in creating a DataFrame from CSV files between PySpark and Spark Scala. 

Exercise 11: Creating DataFrames from CSV Files

  1. In PySpark, the three options consist of spark.read.csv("…")spark.read.format("csv").load("…"), and spark.read.load("…", format="csv"). Use the following code to use the spark.read.csv("…") option:
csv_pyspark_df1 = spark.read.csv("hdfs://user/your_user_name/data/csv/userdata1.csv"
  , sep=","
  , inferSchema="true"
  , header="true")
  1. Use the following code to use the spark.read.format("csv").load("…") option:
csv_pyspark_df2 = spark.read.format("csv").load("hdfs://user/your_user_name/data/csv/userdata1.csv"
  , sep=","
  , inferSchema="true"
  , header="true")
  1. Use the following code to use the spark.read.load("…", format="csv") option:
csv_pyspark_df3 = spark.read.load("hdfs://user/your_user_name/data/csv/userdata1.csv"
  , format="csv"
  , sep=","
  , inferSchema="true"
  , header="true")
  1. In Spark Scala, after calling the read method, use the option() method to add additional configurations. In Python we set parameters inside the csv() or load() methods. But in Scala the only parameter is the HDFS path. See the following code for csv() and load():
val csv_scala_df1 = spark.read
  .option("sep", ",")
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("hdfs://user/your_user_name/data/csv/userdata1.csv")
 
val csv_scala_df2 = spark.read
  .format("csv")
  .option("sep", ",")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("hdfs://user/your_user_name/data/csv/userdata1.csv")

CSV files are a very popular option for storing data, especially when data sizes are on the smaller side. This is because the data file is human readable and the files are easy to create. As a result you will create a lot of DataFrames from CSV files over your Spark career. 

Like the json() method, the csv() method has many parameters. See the following official documentation links for a complete list.

PySpark: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=json#pyspark.sql.DataFrameReader.csv

Scala: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@csv(paths:String*):org.apache.spark.sql.DataFrame


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.