Try again, Apache Spark!
It’s been more than a year since I’ve started working on my first Big Data project. In this project we chose Apache Spark as batch processing framework. Using Apache Spark throws you in a functional, distributed and asynchronous programming world. While you have not to worry about the distributed adjective, because Spark hides you every problem related to it, the functional and asynchronous adjectives make things more complex with respect to the error handling process. Let’s explain why.
Classical error handling
In classic situations, such as in languages as Java or C++, exceptions are used to signal an exceptional behaviour of a program.
If you need to handle an particular type of exception, you will use a try
statement, surrounding the
statements that can rise the exception.
try {
// This method can rise an Exception
} catch (Exception e) {
// Handle the exceptional behaviour in some way
In Scala language the above statements become as follow.
try {
// This method can rise an Exception
} catch {
case e: Exception => // Handle the exceptional behaviour in some way
I will not discuss if it is better to use a checked or an unchecked exception in Java. This is not the aim of this post. The important thing is the locality of the statements that handle the exception and the statements that rise the exception. The latter is inside the scope of the formers.
Asynchronous execution and exception handling
Apache Spark uses RDD
(Resilient Distributed Datasets) as the basic block to build algorithms over huge amount
of data (refer to A new way to err, Apache Spark for a more detailed description).
On a RDD
you can do two type of operations, transformations and actions. Transformations construct a new RDD
from a previous one. For example. map
and flatMap
operations are considered a transformations.
val lines: RDD[String] = sc.textFile("large_file.txt")
val tokens = lines.flatMap(_ split " ")
Actions compute a result based on an RDD
, and either return it to the driver program or save it
to an external storage system (such as HDFS, HBase, …).
Finally, when you work with RDD
you have to remember that
although you can define new RDDs any time, Spark computes them only in a lazy fashion —that is, the first time they are used in an action.
During the processing of a transformation on a RDD
, something can go wrong and an exception can be raised. The first
attempt to handle the exception is usually to surround the transformation inside a try
val lines: RDD[String] = sc.textFile("large_file.txt")
try {
val tokens =
lines.flatMap(_ split " ")
// This transformation can throw an exception
.map(s => s(10))
} catch {
case e : StringIndexOutOfBoundsException =>
// Doing something in response of the exception
Unfortunately, as stated above, the code inside the transformation will not be really executed until the first action
was reached. Then, the above code is totally useless regarding the handling of the exceptions. The only thing we can do
if we want to catch exceptions rise during transformations on an RDD
is to surround actions statements with
val lines: RDD[String] = sc.textFile("large_file.txt")
val tokens =
lines.flatMap(_ split " ")
.map(s => s(10))
try {
// This try-catch block catch all the exceptions thrown by the
// preceding transformations.
} catch {
case e : StringIndexOutOfBoundsException =>
// Doing something in response of the exception
As you can see, we have lost completely the locality of the exception handling process.
Using this approach we lose which element throw the exception. Furthermore, Spark is used to process huge
amount of data: are we sure we want to block the whole execution for a single error on a single element of an RDD
Functional programming and exception handling
A first attempt to resolve the last problem can be to move the try
statement inside the transformation. Then,
the above code will become the following.
// ...omissis...
val tokens =
lines.flatMap(_ split " ")
.map {
s =>
try {
} catch {
case e : StringIndexOutOfBoundsException =>
// What the hell can we return in this case?
} // end of map
Eureka! We gain back the locality feature! Unfortunately, doing this way, we encountered a new problem. Remember: a
transformation constructs a new RDD
from a previous one. The original type of partial function input of the map
transformation was String => Char
To maintain this signature, we have to return a Char
or a subtype of it also in the case
statement. What should we chose? Empty character? A special character? You know, every choice can lead something wrong
in the sooner or later.
What can we do? The only way to exit from this impasse is to revamp monads. More or less a monad is a generic container that enhances with additional properties a simple type. Scala offers at least three different types of monads that help us to deal with exceptional situations:
and its two subclasses,Some[T]
. This monad acts like a list of one or zero elements and we can use it when we are not interested in the details of the error situation we can encounter.Either
and its two subclasses,Left[T]
. This monad lets you to return two different types of objects,T
, respectively in the case of an exceptional behaviour of the process and in the case of a correct behaviour.Try
and its two sublcasses,Success[T]
. This monad is similar to theEither
. Instead of using a generic typeT
for theLeft
subclass, theFailure
uses always a type that is a subclass ofThrowable
. TheTry
type is available in Scala since version 2.10.
Then, if your aim is that of tracing the exceptions rise during the processing of an RDD
and continuing to elaborate
value not in error only, the Try[T]
monad suites perfectly your needs. This amazing type comes with a useful apply
factory method on the companion object, that lets you to build a Success
or Failure
object directly from the result
of a computation.
// ...omissis...
val tokens =
lines.flatMap(_ split " ")
.map (s => Try(s(10)))
If the computation produces a value, than an object of type Success[T]
is built, a Failure
object is built
otherwise. The types are immutable. The Failure
type gives access to its attribute exception
, that contains the
error rise during computation.
So, your RDD[T]
will become a RDD[Try[T]]
. Using this escamotage, we can now use the same data structure to forward
both data and exceptions. That’s great!
Chaining operations
Ok, we have a RDD[Try[T]]
…now what? How can we work with instances of Try[T]
? Do you remember when I said the this
type is a monad? Then, we can use the map
and flatMap
methods to work proficiently with it.
If you have to transform the content of the a Try
object, you can apply the map
method. The function will be applied
only to instances of the Success
type and the Failure
ones will be forward during the transformation as they are.
If you starts with a Try[A]
, then you will be finished with a Try[B]
. What if your transformation can create a Try
in turn? Using the map
method you will obtain a Try[Try[B]]
type. Well, I say to you: “Using the flatMap
, instead!”.
The flatMap
will lift one level higher your type, returning you simple a Try[B]
// ...omissis...
lines.flatMap(_ split " ")
.map (s => Try(s(10)))
// Using a flatMap the final type will be a RDD[Try[Char]]
// and not a RDD[Try[Try[Char]]]
.flatMap(x => Try(s(20)))
Returning to our initial RDD[Try[T]]
, now we have all the information we need to work with it. In my opinion, the best
way to work with this type is using the for comprehension structure. Let’s have some dirty fun right now.
A for-comprehension in Scala is equal to syntactic sugar for a series of application of flatMap
and map
eventually some filter
function). For example, take the following statements.
// list has type RDD[Try[Int]]
val rdd = sc.parallelize((0 to 10).toList).map(i => Try(i / 0))
val result = => i.flatMap(j => Try(j * 6))
.flatMap(z => Try(z + 3))
.map(k => k + 42))
Using a for-comprrehension the above code can be simplified and rewritten as follow:
// list has type RDD[Try[Int]]
val result = {
i =>
for {
j <- i
z <- Try(j * 6) // flatMap
k <- Try(z + 3) // flatMap
} yield {
k + 42 // map
It is more readable, isn’t it?
Collecting the results
At some point in your program you will need to get only the Success
or Failure
instances inside the RDD[Try[T]]
There is a lot of techniques to accomplish this task, but the one I preferred is to use the collect
Differently from the homonym action, this method accepts a partial function as input and builds a new collection by applying the partial function to all elements on which the function is defined.
// successes has type RDD[Int], no more Try monad
val successes =
rdd.collect {
// The method is applied only to elements of type Success.
case Success(x) => x
And that’s all, folks!