How to Control File Count, Reducers and Partitions in Spark and Spark SQL

Image result for poorly packed boxes
Hopefully after this you’ll get why I chose this image of unevenly packed boxes for a post about Spark partitioning.

After years of working with engineers, analysts, data scientists and general users of big data technology, I have learned a constant: people want to control the number and size of files their job or query will output… and usually for good enough reasons:

  • they wish to share the data externally or load it into a tool (and too many files becomes a headache)
  • they’re encouraged to optimize the output of a job (perhaps creating many small files or not utilizing an appropriate block size)

Whatever the case may be, the desire to control the number of files for a job or query is reasonable – within, ahem, reason – and in general is not too complicated. And, it’s often a very beneficial idea.

However, a thorough understanding of distributed computing paradigms like Map-Reduce (a paradigm Apache Spark follows and builds upon) can help understand how files are created by parallelized processes. More importantly, one can learn the benefits and consequences of manipulating that behavior, and how to do so properly – or at least without degrading performance.

Controlling Initial Partition Count in Spark for an RDD

It’s actually really simple. If you’re reading a source and you want to convey the number of partitions you’d like the resulting RDD to have, you can simply include it as an argument:

val rdd= sc.textFile ("file.txt", 5)

I imagine most of you know that trick, and are looking more for how to control the final output of a job, and the number of files it will result in.

So let’s do that!

Controlling Reducer / File Count in Spark

Option 1: spark.default.parallelism

In the Execution Behavior section of the Apache Spark docs, you will find a setting called spark.default.parallelism– it’s also scattered across Stack Overflow threads – sometimes as the appropriate answer and sometimes not.

It controls, according to the documentation, the…

Default number of partitions in RDDs returned by transformations like joinreduceByKey, and parallelize when not set by user.

Default is 200.

To put it another way, this value controls the number of partitions an RDD (Resilient Distributed Dataset) will have when it is created by transformations. Recall that in Spark there are two key abstractions: transformations (logic that lazily produces new RDDs from existing RDDs) and actions (which force transformations to execute and render the data).

When working with Spark Core and RDDs, this setting will allow you to control the number of partitions an RDD will have after a reduce operation. If you’ve noticed how commonly you end up with Spark stages in your job that are exactly 200 tasks – now you know why!

You can alter this setting as you please. In doing so, you will instruct Spark to use this value as the partition count for any RDDs that are created from transformations, or more specifically in our case, functions that reduce (grouping, filtering, etc). What this will not necessarily control are the partition counts of RDDs that are created by reading a data source – this is most often a factor of the parallelization of the read, which is influenced by file/block counts, at least in HDFS). So don’t be surprised if it doesn’t change the partition count of an RDD created from reading some data (at least not initially – you can create a reduce step that will, however, conform to this setting).

So… if you find that the last step in your job/query is creating 200 files – you can obviously alter this setting to force Spark to use n partitions and therefore output n files. However, you must be wary of skew that might be present in your job, as well as overall data size. As you alter parallelism, especially when decreasing it , you run the risk of losing performance and defeating the purpose of distributed computing. If you drive this setting to 1 inappropriately, you’ll have quite the slow pipeline on your hands – and most certainly run into OOMs (out of memory errors) from pushing all your data to one host.

Tl;DR: spark.default.parallelism is useful for controlling the parallelism of RDDs created by transformations, which are things like joins and groupBys.

Option 2: repartition() or coalesce()

You can also use the built in transformations of repartition() and coalesce(). They achieve similar results but have very different approaches.

repartition() will shuffle data across nodes to achieve as even a balance in terms of data size as it can. Data, by default, is not shuffled by any particular value, it’s simply moved across nodes until a relative balance is achieved. You can choose to repartition(n) to any n count of partitions preferred, increasing or decreasing. You can also repartition on a column if you like.

coalesce(), like repartition(), will allow you to decrease the number of partitions for your data, but not increase. It does far less data movement than repartition, generally speaking.

The main difference between the two: repartition() does a full shuffle of the data and creates relatively equal sized partitions across hosts. coalesce() on the other hand combines existing partitions to try and avoid a significant shuffle.

A popular attempt to output one file from a job or query is to use .coalesce(1) on your data. It might not scale appropriately in all scenarios, but it certainly can in some! Use with caution.

Option 3: spark.sql.shuffle.partitions

spark.sql.shuffle.partitions is a helpful but lesser known configuration. It is very similar to spark.default.parallelism, but applies to SparkSQL (Dataframes and Datasets) instead of Spark Core’s original RDDs.

Its definition:

Configures the number of partitions to use when shuffling data for joins or aggregations.

Default is 200.

So while you can control the partition count of RDDs made from reduce operations using spark.default.parallelism, that setting doesn’t apply for Dataframes and Datasets (which use the SparkSQL API). For those, you’ll need to use spark.sql.shuffle.partitions.

Keep in mind that this will not change the default partition count for any old Dataframe or Dataset. It will only change the default partition count for Dataframes and Datasets that are the result of reduce computations: like joins and aggregations.

So in short, you must be invoking an operation that uses a shuffle to take advantage of this editable value.

Understanding the Output of your Job or Query

Because you are using a distributed computing framework to process your big data, any n number of hosts can take part in computation as well as output. But the general rule is:

You will have n number of files matching the n number of hosts that hold data at the time the write is requested. Therefore, all those hosts will write the data they have individually and in parallel.

Based on knowing that, it makes sense why the number of files would fluctuate based on the number of final hosts (usually reducers) holding data at the end.

Example: Basic Spark App (no reduce function)

Say this app reads data into Spark from somewhere and writes it somewhere else. Pretty simple. Your output, at least on distributed storage like HDFS, will be made of n files corresponding to the n partitions of your RDD, Dataframe or Dataset. The number of partitions in that initial object will be determined by either:

  • the number of source files being read (because one task will be assigned to each file / block and will create one partition in the resulting object)
  • telling spark explicitly how many partitions you want in that initial object (see Controlling Initial Partition Count in Spark for an RDD at the top of this article for a how-to)

The simplest options you have in a job like this is to:

  • change how many underlying files are in the source data (not easiest)
  • tell spark how many partitions you want before the read occurs (and since there are no reduce operations, partition count will remain the same)
  • use repartition or coalesce to manually alter the partition size of the consumed data before the write occurs

Using one of the above options, you’ll be able to easily control the size of your output.

Example: Basic Spark App (w/ reduce function)

Say this app reads data into Spark from somewhere and writes it somewhere else. But before it writes, it does a grouping operation (sum, max, etc). Your initial object will, just like the previous scenario, will have a partition count defined by either you, the method of consumption, or the data itself.

When the grouping (a transformation) occurs, the resulting data will have partitions defined by something like spark.default.parallelism or spark.sql.shuffle.partitions, or some other baked-in methodology. You of course now, because you’re reducing, have access to the options explained earlier, to control the partition count and therefore file count of the written data.

Again, those options are repartition or coalesce, or one of the aforementioned partition parallelization settings.

In Conclusion

Sometimes it makes sense to control and define the output of your job. Sometimes it doesn’t. Whatever the reason, always remember the golden rules of distributed storage (especially in the case of Hadoop and HDFS):

  • avoid small files when possible
  • especially avoid them in large quantities (namenode headaches)
  • aim for larger files in smaller quantities (try to match your block size)

But if you need to control the output for a specific reason: now you know how. 🙂

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s