Managing RPC and Stack Overflow Errors from High Iterations (maxIters) in Spark ALS

Landon Robinson
Hadoopsters
Published in
4 min readApr 15, 2022

--

Photo by tabitha turner on Unsplash

Article Summary

Recent experiences assisting with an implementation of Spark ALS resulted in two obscure errors related to RPC Disconnects and StackOverflows. If this is happening to you, this post might help.

Context

I was recently helping a peer with their implementation of Spark ALS. They were using ALS (MLLib) in PySpark in a Databricks Notebook on AWS.

They had completed their ALS-based Spark pipeline and it was producing the desired results. However…

The Change

Though their code was complete, they determined that the parameters used in the build step of the ALS model needed adjusting— so they made a change: they increased the maxIters parameter of the model building function. Specifically, they increased maxIters from 3 to 10 (10 is the default in ALS).

That still worked fine! :)

But then they increased maxIters from 10 to 100.

That did not work fine. :(

They started experiencing strange errors for simple and small (or, seemingly small) asks (specifically Spark Actions) like .count() and .collect() on small (again, seemingly small) DataFrames.

Note: .count() results in a .collect() in the background, so .collect() appeared to be a key culprit in triggering these errors.

The errors started out like this:

Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 40.0 failed 1 times, most recent failure: Lost task 0.0 in stage 40.0 (TID 35, localhost): java.lang.StackOverflowError

First, we tried to configure our way out of these errors by setting and tuning parameters like:

  • spark.driver.maxResultSize (limits the total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes)
  • spark.driver.memory (sets the amount of memory for the driver process / executor)
  • spark.network.timeout (sets the default timeout for all network interactions. This value is used in place ofspark.storage.blockManagerHeartbeatTimeoutMs,spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout orspark.rpc.lookupTimeout if they are not configured)

They had little to no effect — we still experienced the same errors. We noticed the RPC error slightly less — but the StackOverflow error still consistently followed Spark actions like .count() or .collect() (again, both of which call the internal .collect() method).

After a less-than-fruitful few hours spent trying to massage the pipeline into functioning without these errors, we tried revisiting the documentation for ALS to see if we were mis-configuring it at some point — and we particularly questioned the number of intervals being requested (100) compared to the default of 10.

We noticed at the same time that an enormous quantity of stages and tasks were being generated in the Spark UI — and that it seemed to correlate directly with the number of iterations we were requesting of ALS.

From here, we did a few tests:

  • we tried only changing maxIters to 3 (that worked fine)
  • we tried changing maxIters to 10 (the default) (that worked fine too)
  • we tried changing maxIters to 100 (the original) (errors / failures returned)

Now we’re getting somewhere. So, we started Googling around for ALS Spark Interval StackOverflow RPC errors and found a few discussions on the subject:

What we discovered is that we should have been using two very important configurations before we built and trained our ALS model:

spark.sparkContext.setCheckpointDir("/some/path/")
ALS.checkpointInterval = some_number

Here are their definitions:

setCheckpointDir = Set the directory under which RDDs are going to be checkpointed. The directory must be an HDFS path if running on a cluster. (we used /tmp/checkpoint/ in our Notebook on Databricks)

checkpointinterval = Set period (in iterations) between checkpoints (default = 10). Checkpointing helps with recovery (when nodes fail) and StackOverflow exceptions caused by long lineage. It also helps with eliminating temporary shuffle files on disk, which can be important when there are many ALS iterations. If the checkpoint directory is not set in SparkContext, this setting is ignored.

So we finally have put two and two together: our maxIters was directly impacting the number of Spark Stages and Tasks being generated— and that was directly impacting the size and lineage of our Spark Application — which was resulting in StackOverflow errors and RPC Disconnects!

The sheer volume of stages and tasks was becoming so enormous that the application (i.e. the JVM) couldn’t cope. So, we needed to be using Checkpointing (i.e. RDD offloading at set intervals) to reduce the lineage (and therefore the burden) of our Spark application (again, the driver JVM).

After we set spark.sparkContext.setCheckpointDir to /tmp/checkpointand ALS.checkpointInterval to 2 , we suffered no more of those errors — even with maxIters at 100. That’s not to say you won’t — it all depends on the number of DataFrames and RDDs you have in play. After all, while Spark is very powerful, it’s not without its need for tuning.

--

--

Writing about big data since 2015. Data Science Engineering Manager at Disney.