Introduction

In the previous section, 2.1 DataFrame Data Analysis, we used US census data and processed the columns to create a DataFrame called census_df. After processing and organizing the data we would like to save the data as files for use later. In Spark the best and most often used location to save data is HDFS. As we saw in 1.3: Creating DataFrames from Files, we can read files from HDFS to create a DataFrame. Likewise, we can also write a DataFrame to HDFS as files in different file formats. This section will cover writing DataFrames to HDFS as Parquet, ORC, JSON, CSV, and Avro files.

We will use our census_df DataFrame to write to HDFS. To write a DataFrame, chain the write method onto the DataFrame name, in our case census_df. The write method has many options for writing data files, but the main method is the save() method. It has several parameters but the first is the HDFS directory path to the location you would like to write to. Spark and Hadoop make this very easy because the directory path you specify when writing to HDFS doesn’t have to exist to use it. At runtime if the directory path doesn’t exist then HDFS creates it and places your files there. Here is a basic example in PySpark and Spark Scala of writing the census_df DataFrame to HDFS:

census_df.write.save("hdfs://…/some_parent_dir/another_directory")

Spark offers many options when writing DataFrames as files to HDFS. Spark allows us to control if data is appended or overwritten, specific compression technologies to use, and a host of other file format specific options. In this section we will explain writing DataFrames to HDFS as Parquet, ORC, JSON, CSV, and Avro files formats. As we cover each file format we will introduce additional options that can be used when writing files to HDFS.

Writing DataFrames to HDFS as Parquet Files

There are two ways to write a DataFrame as parquet files to HDFS: the parquet() and the save() method. Using the parquet() method is the easiest and most straightforward. Call the write method on a DataFrame and then directly call the parquet() method. The parquet() method has only one parameter: the HDFS directory path to where you would like the files saved. The PySpark and Spark Scala code looks like:

census_df.write.parquet("hdfs://…/spark_output/parquet_dir")

Note:

Writing a DataFrame to HDFS doesn’t produce a Spark console output. If the command does not produce an error then the files were written as your command specified.

The other option of writing a DataFrame to Parquet files is to use the save() method. The save() method allows more control over the final output and is used in conjunction with the format() method. The format() method specifies the output file format used by the save() method. It has one parameter, which is a string representing the file type: "parquet", "orc", "json", "csv", "avro". In our case, we would like to write the “census_df” DataFrame as parquet files to HDFS. The PySpark and Spark Scala code is:

census_df.write.format("parquet").save("hdfs://…/spark_output/parquet_dir2")

Using the save() method allows more options when writing data. One of the choices is the compression type of the individual files written to HDFS. To set the compression use the option() method where the first parameter is "compression" and the second is the compression type. 

The purpose of compressing files is to take up less space while storing the data. Files that take up less space are important because they cost less to store and they are faster to send across the network of the cluster. In Spark there are multiple compression types to choose from when writing data. Each compression option is different and will result in different performance. Here are the compression options for Parquet, ORC, and JSON file formats:

Parquetnoneuncompressedsnappygziplzobrotlilz4, & zstd

ORCnonesnappyzlib, & lzo

JSONnonebzip2gziplz4snappy & deflate

As an example, if we wanted to write a DataFrame to HDFS using Parquet files compressed with “gzip” technology, the PySpark and Spark Scala code would look like:

census_df.write.format("parquet").option("compression", "gzip").save("hdfs://…/spark_output/parquet3")

Writing DataFrames to HDFS as ORC Files

Similar to writing as Parquet files, there are two option when writing DataFrames as ORC files: the orc() method and the save() method. The orc() method simply saves DataFrames in HDFS as ORC files.

PySpark and Spark Scala code:

census_df.write.orc("hdfs://…/spark_output/orc")

Using the save() method requires using .format("orc") to specify the files as ORC. The PySpark and Spark Scala code:

census_df.write.format("orc").save("hdfs://…/spark_output/orc2")

Another option that can be used with the save() method is the mode() method. The mode operation specifies the behavior when writing data files or directories that already exist. There are four parameter options to pass to the mode() method as a string:

  • overwrite: Overwrite the existing data
  • append: Append contents of this DataFrame to existing data
  • ignore: Silently ignore this operation if data already exists
  • error or errorifexists: Default option, throw an exception at runtime if data already exists

A couple examples of specifying the mode() method in PySpark and Spark Scala look like:

census_df.write.format("orc").mode("append").save("hdfs://…/spark_output/orc2")

census_df.write.format("orc").mode("overwrite").save("hdfs://…/spark_output/orc2")

Writing DataFrames to HDFS as JSON Files

Writing a DataFrame to HDFS as JSON files using the json() method in PySpark and Spark Scala looks like:

census_df.write.mode("overwrite").json("hdfs://…/spark_output/json")

The json() method has several other options for specifying how the JSON obects are written. The optional parameters include: dateFormattimestampFormatencoding, and lineSep. To include multiple options in the writing process you can chain multiple option() methods together to specify as many as you need. 

In the PySpark and Spark Scala examples below we use multiple option() method to set the JSON date format and the line separater.

PySpark:

census_df.write \
  .format("json") \
  .mode("overwrite") \
  .option("dateFormat", "yyyy/mm/dd") \
  .option("lineSep", "\n") \
  .save("hdfs://…/spark_output/json")

Spark Scala:

census_df.write
   .format("json")
   .mode("overwrite")
   .option("dateFormat", "yyyy/mm/dd")
   .option("lineSep", "\n")
   .save("hdfs://…/spark_output/json")

Writing DataFrames to HDFS as CSV Files

Writing CSV files to HDFS has over fifteen optional parameters. We will not cover all of them but will highlight some of the more common choices. See the official PySpark and Spark Scala API documentation for all the options.

In the following code we are writing the “census_df” DataFrame as CSV files with comma separator, quotes as double quotes, and includes a header row of column names.

PySpark:

(census_df.write
  .format("csv")
  .mode("overwrite")
  .option("sep", ",")
  .option("quote", '"')
  .option("header", "true")
  .save("hdfs://…/spark_output/csv"))

Spark Scala:

census_df.write
  .mode("overwrite")
  .option("sep", ",")
  .option("quote", "")
  .option("header", "true")
  .csv("hdfs://…/spark_output/csv")

Writing DataFrames to HDFS as Avro Files

There is no avro() method in Spark so the only way to write a DataFrame to HDFS is to use the save() method in conjunction with the .format("avro") method.

PySpark

census_df.write \
  .format("avro") \
  .mode("overwrite") \
  .save("hdfs://…/spark_output/avro")

Spark Scala

census_df.write
  .format("avro")
  .mode("append")
  .save("hdfs://…/spark_output/avro")

Partitions

In Spark the word “partition” is used a lot and has many different usages and meanings. This section will cover partitions, repartitioning, and partitioning. 

Partition

In Spark, the data in a DataFrame is held in memory and if the size of the data exceeds the amount of available memory then the data is spilled over to the local disc drive of the worker nodes of the cluster. This local disc space is not the same as HDFS, though. Since Spark is designed to run on a distributed system of machines, the data of a DataFrame is split up into smaller pieces and distributed in the memory and/or local storage of the worker nodes. In that sense, a Spark DataFrame is divided logically i.e. partitioned. So a partition is an atomic collection of subset data of a DataFrame residing in memory and/or on the local disc of a worker node. The purpose of DataFrame partitions is that data split up into smaller chunks can have computations executed in parallel. This greatly speeds up computations. 

A partition is an atomic collection of subset data of a DataFrame residing in memory and/or on the local disc of a worker node

We can see how many partitions our DataFrame is split into by converting our DataFrame into a RDD with the help of the rdd and the getNumPartitions() methods. Because at their cores, both RDDs and DataFrames are immutable and distributed collections of data, converting between RDDs and DataFrames is easy and painless. The number of default partitions that a DataFrame is split up into is largely dependent on the number of cores in all the worker nodes of your Hadoop cluster. For example, if a DataFrame consisted of 100 gigabytes of data and the Hadoop cluster had a total of sixty-four cores, then each partition of the DataFrame would roughly hold about 1.5 gigabytes of data split into approximately sixtry-four partitions. Also, there is no size limit to the  amount of data in a partition. As long as the Hadoop cluster can hold the entire data size, Hadoop and Spark will scale to hold any partition size. 

The image below is a great visual representation of DataFrame partitions split up between the worker nodes of the cluster.

Link: https://stackoverflow.com/a/45705976
Link: https://stackoverflow.com/a/45705976

To get the number of partitions of a DataFrame, call the rdd method on your DataFrame and then call the method getNumPartitions(). In the examples below the PySpark DataFrame is split into sixteen partitions while the Scala DataFrame is split into eight partitions. Because both use the same census_df DataFrame, each PySpark partition holds less data. However, the sum of all the partitions for the PySpark & Spark Scala DataFrame is the same.

PySpark

census_df.rdd.getNumPartitions()

Output

Out[5]: 16

Spark Scala

census_df.rdd.getNumPartitions

Output

res5: Int = 8

Repartition

Spark allows us to control the number of DataFrame partitions. Repartitioning is the increasing or decreasing of number of partitions for the purpose of adjusting the balance of data of an existing DataFrame. Repartitioning creates a new DataFrame that is hash partitioned, which means that rows are distributed evenly (as much as possible) across the newly created/assigned partitions.

Repartitioning is the increasing or decreasing of number of partitions for the purpose of adjusting the balance of data of an existing DataFrame

Why would we need to repartition a DataFrame? Spark does not repartition when filtering or reducing the size of a DataFrame. So, imagine we had a DataFrame with 1 billion rows of data split into 10,000 partitions. That would be 100,000 rows of data per partition. If we dramatically filtered the data or took a statistical sample of the data and reduced the size of the DataFrame, to say 50,000 rows, then the DataFrame would still have 10,000 partitions. The DataFrame would have 50,000 rows of data in 10,000 partitions; that is not very efficient. To optimize the data in the new DataFrame, Spark has two methods, repartition() and coalesce(), that can be used to balance the size of the partitions to a more appropriate number given the overall size of the DataFrame. However both methods are very different in the way they function and their parameters. Let’s look a coalesece() first. 

The coalesce() method returns a new DataFrame with the number of partitions specified as a parameter. However, there is a significant limitation with coalesce() and that is it can only reduce the number of partitions. It cannot increase the number of partitions of a DataFrame. If a user executes a coalesce() command that specifies a larger number of partitions, then the DataFrame will stay at its current partition count and the operation will do nothing. The reason that coalesce() can only reduce partitions is because it doesn’t shuffle the data when repartitioning. Shuffling is the process of moving data between partitions. Shuffles are very computationally expensive because Spark must touch the data in all the partitions and move it around. Since coalesce() doesn’t do a shuffle, it combines or stacks the current partitions into a smaller number of partitions. This allows the coalesce() method to operate very quickly compared to the other method, repartition().

Shuffling is the process of moving data between partitions

To partition the census_df DataFrame into four partitions with the coalesce() method:

PySpark

four_parts_df = census_df.coalesce(4)
four_parts_df.rdd.getNumPartitions()

Output

Out[6]: 4

Spark Scala

val four_parts_df = census_df.coalesce(4)
four_parts_df.rdd.getNumPartitions

Output

res6: Int = 4

The repartition() method can increase or decrease the number of internal partitions of a DataFrame because each time a data shuffle is performed. Take caution when performing a repartition on a very large DataFrame because it could take a long time to complete. repartition() returns a new DataFrame according to the partition expression supplied by its parameters. It has two optional parameters that can be used together or separately for a total of three separate configurations. 

The first parameter is an integer that is the target number of partitions you would like to increase or decrease to. Here are examples of increasing and decreasing the number of partitions. 

PySpark

ten_parts_df = census_df.repartition(10)
ten_parts_df.rdd.getNumPartitions()

Output

Out[7]: 10

Spark Scala

val three_parts_df = census_df.repartition(3)
three_parts_df.rdd.getNumPartitions

Output

res7: Int = 3

The second option is to include one or many column names to the repartition() method. It is important to note that when partitioning the data of a DataFrame by a column or columns, Spark will create at a minimum of 200 partitions. Even if a DataFrame already had 100 internal and we partitioned it by a column, it would still create 200 partitions. This hypothetical example would create 200 partitions and some number of partitions would be completely empty of data. The 200 number partition value is controlled by the configuration “spark.sql.shuffle.partitions”. We can verify this 200 value number by exectuing the following Spark command in PySpark and Spark Scala: 

spark.conf.get("spark.sql.shuffle.partitions")

Let’s show some real examples of partitioning by column. The repartition() method will accept column names as strings or by column index.

PySpark

state_part_df = census_df.repartition("us_state")
state_part_df.rdd.getNumPartitions()

Output

Out[8]: 200

Using Spark Scala to partition by two columns:

val state_county_part = census_df.repartition(census_df("us_state"), census_df("county_name"))
state_county_part.rdd.getNumPartitions

Even though there are only fifty states, the output is still “200”:

res8: Int = 200

The last option is to combine the previous two options and specify the number of partitions along with a column. 

PySpark

state_50_parts = census_df.repartition(50, "us_state")
state_50_parts.rdd.getNumPartitions()

Output

Out[9]: 50

After setting the number of DataFrame partitions, another way to verify the number of partitions is to write the DataFrame to HDFS. And the number of actual files written to HDFS will be equal to the number of DataFrame partitions for both the coalesce() and repartition() methods.

PySpark

parts_to_files = census_df.repartition(10)
print(parts_to_files.rdd.getNumPartitions())

parts_to_files.write.format("avro").mode("overwrite").save("hdfs://…/spark_output/parts_to_files")

Output

10

And if we list out the files in HDFS at /spark_output/parts_to_files we see something similar to the following:

spark_output/parts_to_files/part-00000-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113746-1-c000.avro
spark_output/parts_to_files/part-00001-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113747-1-c000.avro
spark_output/parts_to_files/part-00002-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113748-1-c000.avro
spark_output/parts_to_files/part-00003-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113749-1-c000.avro
spark_output/parts_to_files/part-00004-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113750-1-c000.avro
spark_output/parts_to_files/part-00005-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113751-1-c000.avro
spark_output/parts_to_files/part-00006-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113752-1-c000.avro
spark_output/parts_to_files/part-00007-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113753-1-c000.avro
spark_output/parts_to_files/part-00008-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113754-1-c000.avro
spark_output/parts_to_files/part-00009-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113755-1-c000.avro

Partitioning

Partitioning is completely different from partitions of a DataFrame or repartitioning those partitions. Partitioning is the physical writing of data as files in HDFS organized into subdirectories corresponding to a column or columns in the data. It is mainly used to improve query performance by distributing the data horizontally assuming that the partitioning scheme corresponds to likely filtering from users.

The image below (taken from a great partition article) is a great visual representation of the structure of partitioning. In this hypothetical example, the data has two levels of partitioning: by year and by month. Each column of blue folders correspond to two actual columns in the data: “date_year” and “date_month“. Each unique value in the “date_year” and “date_month” columns (2015, 2016, etc. or 01, 02, 03, etc.) get their own directory in HDFS. All of the data files reside on the last partition directory.

Link: https://i0.wp.com/www.datio.com/wp-content/uploads/2016/11/post-3.jpg?w=767&ssl=1

Partitioning is the physical writing of data as files in HDFS organized into subdirectories corresponding to a column or columns in the data

Partitioning uses the partitionBy() method used when writing a DataFrame to HDFS. The partitionBy() method can be used in conjunction with the format()mode(), and save() methods we introduced at the beginning of this section. Let’s look at an example.

PySpark

(census_df.write
  .format("parquet")
  .mode("overwrite")
  .partitionBy("us_state")
  .save("hdfs://…/spark_output/part_by_state"))

The partitionBy() method only takes one or more column names as strings. As stated earlier, partitioning literally alters the output of files in HDFS based upon a column by writing the data into subdirectories. So in the previous example, we saved the data in a directory named part_by_state which is located inside another directory called spark_output. If we were writing data to HDFS without partitioning, the data files (Parquet, ORC, etc.) would be saved directly inside the part_by_state directory. However, when partitioning the part_by_state directory holds subdirectories corresponding to the column name or names provided in the partitionBy() method. So in our example, the contents of the directory part_by_state would be fifty-one directories for the fifty U.S. states plus the District of Columbia. Here is the first eleven directories (which are partitions) in the part_by_state directory:

us_state=Alabama
us_state=Alaska
us_state=Arizona
us_state=Arkansas
us_state=California
us_state=Colorado
us_state=Connecticut
us_state=Delaware
us_state=District of Columbia
us_state=Florida
us_state=Georgia
…

So the actual directories are the string characters “us_state=Alabama” or “us_state=California“. The strings to the left of the equal sign are the column name, “us_state“, and the strings to the right of the equal sign are the distinct values of that column. Within each state directory are file or files of the specified format corresponding to rows of data where the column equals the particular partition value. In our case the partition “us_state=Florida” holds all of the rows of data in the DataFrame census_df where the column “us_state” equal “Florida”. This is what it means to by partitioning.

We can partition by multiple columns by separating string column names by commas in the partitionBy() method. Here is a double partitioned example in Spark Scala. 

census_df.write
  .format("orc")
  .mode("overwrite")
  .partitionBy("us_state", "county_name")
  .save("hdfs://…/spark_output/part_by_state_orc")

When partitioning by multiple columns each subsequent partition is inside the previous partition directory. So in this example the directory part_by_state_orc contains all 51 state partitions. Then within each state directory partition holds all the distinct “county_name” values for that state as county partitions. The output is a nested directory structure that can continue for as long as there are additional columns. So the directory structure would look like the example below. And the actual files would reside in each of the “county_name” directories inside each state directory. 

us_state=Alabama/county_name=Autauga County
us_state=Alabama/county_name=Baldwin County
us_state=Alabama/county_name=Barbour County
us_state=Alabama/county_name=Bibb County
us_state=Alabama/county_name=Blount County
us_state=Alabama/county_name=Bullock County
us_state=Alabama/county_name=Butler County
us_state=Alabama/county_name=Calhoun County
us_state=Alabama/county_name=Chambers County
us_state=Alabama/county_name=Cherokee County
…

Partitioning is best used on really big data. Because small data (like our Census data of 1.2 million rows) really doesn’t need to be partitioned in HDFS. Partition columns should be categorical or discrete columns because those columns have a finite and lower number of distinct values. The term for this is cardinality. Cardinality is the number of values in a group. In our case, the group is the column we would like to partition on. It is best for partition columns to have low cardinality because a large amount of partition values would take up too much time scanning the partitions and counteracting the benefits. A good rule of thumb for selecting a potential partition column is one with values no greater than ten thousand. Overall, partitioning is a great strategy for optimizing the physical layout of your data that can drastically increase the query speeds of the data because it can skip entire sections of your data.

In our example above, choosing the partition columns of US State and/or State County are good choices because of their low cardinality and the data is roughly distributed between these choices. For example if you have retail data, then it is a good idea to partition it by a date string column, for example, “YYYY-MM-DD”. Since retail transactions are naturally split up by date, each new day gets its own partition. And since there are only 365 partitions in a year, the cardinality is low. But if you had world-wide retail sales, then it might be a good idea to add a second partition by country. Another important consideration is to choose potential partition columns that will not result in small amounts of data inside each partition. This is called the “the small files problem”. A file is “small” when it is smaller than the HDFS block size. And if you have a lot of small files this is an even bigger problem because every file, directory, and block has to be represented in the clusters namenode. It takes a lot of time and processing power to scan an extremely large amount of small files. That is why Hadoop was designed to read large files. Furthermore in Hadoop, files are distributed which means they are split up into smaller pieces. If the data files are small to begin with then Hadoop will take an incredibly long time scaning the data.

An example of the small files problem would be if you partitioned your data by hour and each hour only held 5 MB of data. Two years of data would be almost 20,000 partitions. If an user performed a query on two years of data then Spark would have to read all of these partitions just to process a small amount of data. It would be better to pick a partition column with a lower cardinality in this case. Regardless, just remember good partition columns are columns that will naturally follow users queries and have a low cardinality. 


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.