Back

TechnologyApr 18, 2017

Mastering Scala: Futures

Micah Jones

Writing good multithreaded code is essential to taking full advantage of modern computer hardware. It can also be very difficult, particularly when reasoning about thread safety. Declarative programming solves the thread safety problem by focusing on immutable variables that cannot change their state. If multiple threads can’t modify the same data, then they can run independent of one other, with no need for resource locks. Scala encourages us to write in a declarative, functional manner, and thereby lends itself to highly multithreaded programs.

Futures are the standard mechanism for writing multithreaded code in Scala. Whenever we create a new Future operation, Scala spawns a new thread to run that Future’s code, and after completion it executes any provided callbacks.

In order to use Futures, Scala requires us to provide an implicit execution context, which controls the thread pool in which Futures execute. We can create our own execution contexts, but importing the default one usually suffices. To run the code examples below, you will need the following two imports:

scala> import scala.concurrent.Future scala> import scala.concurrent.ExecutionContext.Implicits.global

Creating a Future is easy: We need only enclose a block of code as an argument to Future. For example, to create a threaded function that will add two numbers together:

scala> def add(x:Int,y:Int) = Future { x + y } add: (x: Int, y: Int) Future[Int]

Scala will infer that add has a return type of Future[Int], and the enclosed code will execute in its own thread when the function is called.

Callbacks

When a Future completes, we typically want to do something with its result. Scala allows us to define callback functions, which execute upon a Future’s success or failure. In the meantime, the thread that created the Future is unblocked and may continue to execute.

The most basic callback is onComplete:

scala> add(2,3).onComplete(result => println(result))

Success(5)

After the Future add(2,3) completes successfully, the onComplete callback receives a Success object containing the result of computing x+y. To extract that inner result for code purposes, we can use pattern matching:

scala> import scala.util.{Success, Failure}

scala> add(2,3).onComplete { case Success(n) => println(n) case Failure(e) => println(e.getMessage) } 5

Alternatively, we can create separate callbacks that fire specifically for successes or for failures:

scala> add(2,3).onSuccess { case n => println(n) } 5

scala> val errorFuture = Future { throw new Exception("Mission Failed") } scala> errorFuture.onFailure { case e => println(e.getMessage) } Mission Failed

Note that onSuccess and onFuture return partial functions, so we still need to use pattern matching to extract their contents.

We can also just use map, which will execute its assigned function if the Future succeeds:

scala> add(2,3).map { n => println(n) } 5

Scala does provide an Await object for blocking the current thread until a Future completes, but its use is highly discouraged as it limits the efficiency advantages of multithreading, prevents freeing of system resources, and introduces the possibility of deadlock. For example, to await the result of a Future with a timeout of 10 seconds:

scala> import scala.concurrent.Await scala> import scala.concurrent.duration._ scala> val sum = Await.result(add(1,2), 10 seconds) sum: Int = 3

If the Future does not complete before a timeout occurs, Await will throw a TimeoutException.

Combining Futures

Sometimes we need to chain multiple Futures together, such as when we need to submit the result of one Future operation as a parameter to another Future operation. For example, suppose we want to take the result of the add operation above and then double that result in a new thread:

scala> def double(n:Int):Future[Int] = Future { n*2 } double: (n: Int) Future[Int]

scala> def addThenDouble(x:Int,y:Int) = add(x,y).map(sum => double(sum)) addThenDouble: (x: Int, y: Int) Future[Future[Int]]

By mapping a Future’s result to another Future, we’ve ended up with a return type of Future[Future[Int]]. We can instead use flatMap to eliminate nested Futures:

scala> def addThenDouble(x:Int,y:Int) = add(x,y).flatMap(sum => double(sum)) addThenDouble: (x: Int, y: Int) Future[Int]

scala> addThenDouble(2,3).map(result => println(result)) 10

Alternatively, we might want to run multiple futures independently and then collect their results. Scala’s for-yield construct is an ideal solution:

scala> for (sum1 <- add(1,2) sum2 <- add(3,4) sum3 <- add(5,6)) yield { println((sum1,sum2,sum3)) } (3,7,11)

However, the code above is not strongly multithreaded, because it executes the Futures in sequence. Internally, Scala converts the for-yield into an equivalent sequence of mappings:

scala> add(1,2).flatMap(sum1 => add(3,4).flatMap(sum2 => add(5,6).map(sum3 => { println((sum1,sum2,sum3)) }))) (3,7,11)

If we want to independently execute our futures in parallel and then collect their results, we must declare our Futures and thereby start their execution prior to the for-yield:

scala> { val sum1Future = add(1,2) val sum2Future = add(3,4) val sum3Future = add(5,6) for (sum1 <- sum1Future sum2 <- sum2Future sum3 <- sum3Future) yield { println((sum1,sum2,sum3)) } } (3,7,11)

The code on the right-hand side of the yield will only execute once all provided Futures have succeeded. If any of them fail, it will not execute at all.

Another common approach is to just collect multiple Futures into a List:

scala> List(add(1,2),add(3,4),add(5,6)) List[Future[Int]]

We might want to take the resulting List[Future[Int]] and convert it to a Future[List[Int]], allowing us to reason about the full collection of Future results. In this case we will need to use Future.sequence, which converts a list of Futures into one aggregate Future:

scala> val listOfFutures = List(add(1,2),add(3,4),add(5,6)) listOfFutures: List[Future[Int]]

scala> Future.sequence(listOfFutures).map(println(_)) List(3, 7, 11)

scala> Future.sequence(listOfFutures).map(list => println(list.sum)) 21

Note that because the Futures in the List each begin execution prior to being submitted to Future.sequence, they will all run in parallel even though the output List retains proper ordering. Also, if any Future in the List fails, the resulting aggregate Future will fail.

Handling Failures

Because a Future runs in a thread outside of the one in which it is defined, we cannot capture its exceptions with a normal try-catch block. Therefore, the following code will not work as desired:

scala> try { errorFuture } catch { case e => println(e) }

If you run this code, errorFuture will throw an exception, but it will not be caught by the catch block as it occurs in a separate thread.

Instead, we can use recover as a callback to handle any exceptions thrown from within our Future:

scala> errorFuture recover { case e:Exception => println(e) } java.lang.Exception: Mission Failed

Alternatively, recoverWith allows us to execute another Future within our exception handler:

scala> (errorFuture recoverWith { case e => add(1,2) }).map(result => println(result)) 3

In this case we don’t really care about the specific exception a Future throws, and so we don’t benefit from the pattern matching logic above. Instead, we can just use fallbackTo, which just executes another Future if our original one fails:

scala> (errorFuture fallbackTo add(1,2)).map( result => println(result)) 3

Conclusion

Multithreaded code can be challenging to write, but it is essential to modern software development. Functional programming, with its commitment to immutability, lends itself to clean, efficient parallelization with minimal blocking logic. Scala’s Future libraries allow us to easily write highly multithreaded code, providing us with a variety of powerful tools for combining threads, reasoning about their results, and handling failures.

Micah Jones is a consultant in Credera’s Integration & Data Services (IDS) practice. In 2011, he received his doctorate in computer science from the University of Texas at Dallas, specializing in programming language-based security.

Have a Question?

Please complete the Captcha