Akka Streams: error handling in event processing pipelines

A short read about being careful with error handling in stream design.

April 03, 2019 · scala akka error handling

Cover image

Akka Streams is a reasonable choice when it comes to event processing. A wide variety of connectors and integrations makes it easy to assemble a working pipeline and get up to speed quickly.

On the other hand, we have a long list of predefined operators, GraphDSL, support for error handling, a possibility of rolling out custom stages and more. It means that having a rough idea of what we want to achieve, there is always a number of ways to approach the stream design. It’s quite easy to assemble a working graph. But to make the code scalable, maintainable and easily extendable, it usually needs experimentation and playing around with different approaches. Especially when building a non-trivial processing pipeline.

One hint that might help to narrow down the choices is starting with types. If you’re not sure how to design this Flow or that Sink, think about what could be a reasonable type. In other words, think about how to express the role of this stage in types. This usually helps when facing multiple choices and can guide you to the final design.

One of the most fundamental things when designing a stream is error handling, and there are multiple ways to approach it in Akka Streams. Now, having all of this in mind, let’s see how to iteratively tackle this problem in a real-world example.

Example

In the following example, we’re using Akka Streams 2.5.21 to model an event processing pipeline. The stream consists of the following stages:

As you might notice, each stage might fail. And this is where error handling is important to get right. Otherwise, it might negatively impact the overall stream design, which you’ll see next.

Iteration #1

Let’s start from the beginning. Our stream begins with a Source of events:

val source: Source[Event, NotUsed] = ???

In the next step, we’d probably want to decode each event to give it a domain meaning. Let’s use circe for it.

val decodingFlow: Flow[Event, Either[Error, OrderShipped], NotUsed] =         
    Flow.fromFunction(event  decode[OrderShipped](event.content))

So far so good. Since the types align, our stages are composable:

val decodingSource: Source[Either[Error, OrderShipped], NotUsed] = 
    source.via(decodingFlow)

Let’s now implement the validation flow:

val validatingFlow: Flow[Either[Error, OrderShipped],Either[Error, OrderShipped], NotUsed] = ???

To make this stage composable, it has to accept Either[Error, OrderShipped] on its input. Let’s pause here and think. Why would it accept Either on its input? What would it actually mean?

Going forward with this idea, every stage would more or less look like the following:

val flow = Flow[Either[Error, OrderShipped]]
    .map {
        case Right(event) ??? // actual business logic here 
        case Left(_) ??? // pass downstream
    }

Now every flow in our stream does two things:

And it’s bad if all our stages, by a wrong design decision, do two things instead of one.

Iteration #2 — better types

Let’s improve! In the second iteration, let’s think about what could be the reasonable types. The first iteration taught us, that the flows shouldn’t accept Eithers. It’d be better if they accepted a simple value, and returned the result of its processing. Having this in mind, we could start with the following types:

val source: Source[Event, NotUsed]
val decodingFlow: Flow[Event, Either[ProcessingError, OrderShipped], NotUsed]
val validatingFlow: Flow[OrderShipped, Either[ProcessingError, OrderShipped], NotUsed]
val persistingFlow: Flow[OrderShipped, Either[ProcessingError, OrderShipped], NotUsed]

Each flow is now simpler. It accepts a single element, does the processing, and returns the result. Looks way better than in the first iteration. The ProcessingError could be defined as follows:

sealed trait ProcessingError
case class DecodingError(...)   extends ProcessingError
case class ValidationError(...) extends ProcessingError
case class PersistenceError(...) extends ProcessingError

Unfortunately, now we’re not able to easily wire those stages, as the types do not align. It means, that the following will not compile anymore:

source
  .via(decodingFlow) // out: Either[ProcessingError, OrderShipped]
  .via(validatingFlow) // in: OrderShipped
  .via(persistingFlow)
  .to(sink)

What we’d want to achieve now is to immediately plug error handling sink after each stage, if an error occurred. In other words, if the output of the flow is Left, we’d attach error handling sink. Otherwise, we’d push the successful element downstream to the next stage. What we want to achieve can be depicted in the following diagram:

Iteration #2 — better types

In this way, we can apply custom error handling for each type of error:

One way to wire the stream to implement the above idea is to leverage using flatMapConcat, which has the following signature:

def flatMapConcat[T, M](f: Out Graph[SourceShape[T], M]): Repr[T]

in this way:

val graph: RunnableGraph[NotUsed] = source
  .via(decodingFlow)
  .flatMapConcat {
    case l @ Left(_) Source.single(l)
    case Right(orderShipped)       Source
        .single(orderShipped)
        .via(validatingFlow)
        .flatMapConcat {
          case l @ Left(_) Source.single(l)
          case Right(orderShipped)             Source
              .single(orderShipped)
              .via(persistingFlow)
        }
  }
  .to(sink)

val sink: Sink[Either[ProcessingError, OrderShipped], NotUsed] = ??? // handle all errors

This could potentially work. But this approach of wiring the graph is cumbersome. If the graph consisted of more processing stages, the “tree” of branches would grow. It’s a clear indication, that this approach doesn’t scale and somewhat reminds me the Pyramid of Doom:

Let’s now think about how to improve on the graph wiring itself.

Iteration #3 — better wiring

We’re getting closer to a better design. We know that we want to immediately plug the error handling sink after each stage if an error occurs. What we want to achieve can be depicted in the following diagram:

Iteration #3 — better wiring

Now we have almost all the pieces in place but ???.

It turns out that such a stage can be easily created with the Partition graph operator. Fortunately, it also turns out, that such a stage is generic enough to be already defined in Akka Streams. And it’s called divertTo.

def divertTo(
    that: Graph[SinkShape[Out], _], 
    when: Out Boolean): Repr[Out]
)

The documentation says:

Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element.

Which is exactly what we want. With divertTo, the stream could be wired in the following way:

val source: Source[Event, NotUsed] = ???
val decodingFlowDiverted =
  decodingFlow.divertTo(errorHandlingSink, _.isLeft).collect { case Right(e)  e }
val validatingFlowDiverted =
  validatingFlow.divertTo(errorHandlingSink, _.isLeft).collect { case Right(e)  e }
val persistingFlowDiverted =
  persistingFlow.divertTo(errorHandlingSink, _.isLeft).collect { case Right(e)  e }

val graph: RunnableGraph[NotUsed] =
  source
    .via(decodingFlowDiverted)
    .via(validatingFlowDiverted)
    .via(persistingFlowDiverted)
    .to(sink)

and this time it compiles. As you might notice, there’s a repeating part:

.divertTo(errorHandlingSink, _.isLeft).collect { case Right(e)  e }

so we can introduce a small improvement here as well:

object FlowOps {
  implicit class FlowEitherOps[A, L, R, Mat](flow: Flow[A, Either[L, R], Mat]) {

    def divertLeft(to: Graph[SinkShape[Either[L, R]], Mat]): Flow[A, R, Mat] =
      flow.via {
        Flow[Either[L, R]]
          .divertTo(to, _.isLeft)
          .collect { case Right(element)  element }
      }
  }
}

and the wiring is now simplified even more:

import FlowOps._
val source: Source[Event, NotUsed] = ???
val decodingFlowDiverted: Flow[Event, OrderShipped, NotUsed] =
  decodingFlow.divertLeft(to = errorHandlingSink)
val validatingFlowDiverted: Flow[OrderShipped, OrderShipped, NotUsed] =
  validatingFlow.divertLeft(to = errorHandlingSink)
val persistingFlowDiverted: Flow[OrderShipped, OrderShipped, NotUsed] =
  persistingFlow.divertLeft(to = errorHandlingSink)

val graph: RunnableGraph[NotUsed] =
  source
    .via(decodingFlowDiverted)
    .via(validatingFlowDiverted)
    .via(persistingFlowDiverted)
    .to(sink)

It’s also important to emphasize, that each errorHandlingSink can be of course different, and hence apply a different error handling strategy.

Summary

Akka Streams gives a solid foundation for building data-intensive pipelines. It comes with a wide variety of building blocks, but sometimes because of this reason, it can be easy to shoot yourself in the foot.

When designing a stream, there’s no single best way to do it. You usually explore different possibilities, try, and fail. As we went through the problem of error handling, we did three iterations, but ultimately landed with an acceptable design.

There are three general tips that can be taken from this lesson:

Thanks for reading!

Resources