This post suggest a way of handling exceptions thrown in an akka-stream. It presents a custom made graphStage for getting hold of a failing element in an akka-stream. It is basically a fancy wrapper for a try-and-catch-block.
Last summer I had the issue that some upstream elements on an MQ (source queue) were not compatible with a schema used for transformation within a akka-stream. This caused the stream to fail. However, there were supervision mechanisms in place to restart the stream. The stream had a read, write, commit-read procedure in place in order to avoid losing elements. In other words, only after a successful write to the sink (a kafka topic in our case) would the element be taken of the source queue - this procedure provides a at-least-once-delivery-guarantee of elements. Unfortunately, having this guarantee in place meant that after a restart of the stream the failing element showed up again and consequently the stream got stuck in a fail-loop...
Given this at-least-once-delivery guarantee it becomes apparent that a mere restart of the stream is not an option when the processing of a stream element fails due the properties of the element in question. A better idea would be to output the failing element on some message queue or table where someone can investigate the cause of the failure. However, I have not found a suitable mechanism in the akka library to get hold of the failing element upon when the exception is thrown within a stream.
There is another posts addressing this topic. However, the author is using Scala and making use of Either
which per se does not exist in Java.
Consider the following "picky" function
applied in a stream of strings.
Obviously the stream will fail on the third element. The akka-stream library provides ways of handling this kind of error. For instance, we can define a supervision strategy which skips the problematic element for a specific type of exception. Another option is to wrap the stream with an actor and restart the stream upon failure. Yet for neither of these options is it possible to get hold of the reference to the failing element. The only thing available is the exception in it self. Thus, we are left with logging the exception or bubbling up the exception to some supervisor. In both cases the original flow element reference is lost and we are left with checking the log or somehow embedding the desired information about the element in the throwable, which does not seem like a viable option.
In contrast, in plain java it is easy to get a hold on a parameter which most likely caused the failure of a function call. Just wrap it in a try-and-catch-block:
How can we apply it within an akka-stream? One option would be to simply wrap the whole body of a function in a try-and-catch-block. However, what would you return from such a function incase of an exception catch? Some elements of the correct return type needs to be passed on in order to keep the stream going.
A potentially better approach might be the use of a custom graphstage. Graphs in akka-stream are used when you have an operation with multiple inputs and/or multiple outputs. The entry and exit points through which the stream elements flow are referred to as in- and out-ports. In our case we have a single input port (in) and two mutually exclusive out- (out, outException). Either the function call succeeds and we have the resulting element which we pass downstream through the normal out-port(out) or we catch the exception thrown from the function and push the problematic untransformed element down the "special" out-port (outException) into an "exception" sink. The following graph illustrates this basic idea.
The previously introduced stream would now look as follows. The MapWithException class defines the custom made graphstage and MapWithException.map() returns the desired Flow by connecting all the ports of the graphstage as described here.
The MapWithExceptionSink class is shown in following gist.
The createLogic function returns a Graphstage logic where the three ports in, out, outException are assigned their respective handles. The first handle is a "onPush" handled assigned to the in-port (in) of the graph stage. It defines the behavior upon a push from upstream.
In the handler of the in port you recognize the core of this graphstage. It is a try and catch block wrapped around the application of the into MapWithExceptionSink.map() passed function. The sole parameter is the stream element received from upstream. If the function call f.apply() succeeds we send the result to the "normal" downstream (out), else the original element gets sent to the exception queue (exceptionOut).
out is assigned an "onPull" handler which pulls in if exceptionOut is available and vice versa for exceptionOut. This reciprocal check of the out-ports state before pulling assures that only when both out-ports are available will in be pulled again. This argument holds due to the state transition of an out-port as depicted in the following state diagram from the akka docs. Namely, a port which is pulled by the connected downstream is available, hence, in the βonPullβ handler of the port in question we only need to check whether the other port is available as well.
Making sure that upstream is only asked for the next element when both out-ports are available is crucial to avoid that elements are pushed downstream before the downstream operator is available. If you do not get this right you will encounter exceptions from the graph stage such as java.lang.IllegalArgumentException: requirement failed: Cannot push port (out) twice
. It would break the backpressure mechanism inherent in akka-streams.
In short, this custom graphstage allows you to capture elements which cause specific function calls to fail and output them in a designated sink. For all other exceptions which are thrown outside of the function in question we can still apply the normal recovery mechanisms provided by akka-streams.
You can find the full code with some basic unit tests at https://github.com/florin-akermann/exception-sink
P.S: The exceptionOut port should probably receive the failing element as well as the exception. This can be achieved by defining the port exceptionOut as Outlet<Pair<O1, Throwable>>
A word of caution
The creator of akka-stream specifically warn about the many pitfalls when creating and using custom graph stages.
Top comments (0)