Error Handling
When IO
fails with an error it short-circuits the computation and returns the error as a result:
import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global
val fa = IO(println("A"))
val fb = IO.raiseError(DummyException("boom"))
val fc = IO(println("C"))
val task = fa.flatMap(_ => fb).flatMap(_ => fc)
task.runSyncUnsafe()
//=> A
//=> Exception in thread "main" monix.execution.exceptions.DummyException: boom
We can handle the error to prevent it with one of many available methods.
For better discoverability, they are often prefixed with onError
.
import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global
val fa = IO(println("A"))
val fb = IO.raiseError(DummyException("boom"))
val fc = IO(println("C"))
val task = fa
.flatMap(_ => fb)
.onErrorHandleWith(_ => IO(println("B recovered")))
.flatMap(_ => fc)
.onErrorHandleWith(_ => IO(println("C recovered")))
task.runSyncUnsafe()
//=> A
//=> B recovered
//=> C
Error Channels
Many applications divide errors into two types:
- Recoverable errors which can be acted upon and often have a meaning in the business domain. Examples: Insufficient permissions for a given action, temporary network failure.
- Non-Recoverable errors which are often fatal or it is not sensible to try to recover from them.
Examples:
StackOverflow
, throwing an exception in a pure function (programmer's error).
IO[E, A]
follow this pattern and can fail with two kind of errors:
- Errors of type
E
which represents recoverable errors. Other common names are "typed" or "expected" errors. - Errors of type
Throwable
for non-recoverable errors. We call them "terminal" or "unexpected" errors. You might also see terminology like "defect" or "unchecked failure" in other libraries.
The general guideline is to use a typed error channel for errors that are expected to be handled or have a value for the caller.
The internal channel (non-recoverable errors) should be used for errors which can't be handled properly or can only be handled somewhere deep downstream in a generic manner (let's say to return InternalServerError
at the edges).
Non-recoverable errors are hidden in the internal error channel which has very few operators and is supposed to be used as rarely as possible. Most of these errors are outside of our control and ideally we don't have to burden our minds with it and use a smaller, more comprehensible errors' domain for the most part of the coding.
The number of possible recoverable errors is often limited and each of them could be handled in a specific way in the business logic.
E
can be any type which allows us to be very precise.
For instance, we can choose E
to be an ADT reflecting errors in our business domain or even use Nothing
to show that we don't have to worry about recovering from any errors.
Possible errors are provided in the type signature which serves as always up-to-date documentation and allows us to easily statically check if all errors are handled.
Furthermore, if there is a change, and it introduces new errors, we might easily miss it.
Consider the following example:
import monix.bio.{IO, Task}
case class ForbiddenNumber() extends Exception
def numberService(i: Int): Task[Int] =
if (i == 0) IO.raiseError(ForbiddenNumber())
else IO.now(i * 2)
def callNumberService(i: Int): Task[Int] = numberService(i).onErrorHandleWith {
case ForbiddenNumber() => callNumberService(i + 1) // try with a different number
case other => IO.raiseError(other) // propagate error
}
When writing callNumberService
method we have to check the implementation of numberService
to see what kind of
errors can we expect because the type signature only specifies Throwable
- so it can be pretty much anything.
On top of that, we might have to add case other => ...
to be safe in case we missed any error and to make our pattern matching exhaustive.
At some point, the implementation might change:
import monix.bio.{IO, Task}
import scala.concurrent.duration._
sealed trait NumberServiceErrors extends Exception
case class ForbiddenNumber() extends NumberServiceErrors
case class ServiceTimeout(duration: FiniteDuration) extends NumberServiceErrors
def numberService(i: Int): Task[Int] =
// Check if we should timeout the caller
if (i == 5) IO.raiseError(ServiceTimeout(10.second))
else if (i == 0) IO.raiseError(ForbiddenNumber())
else IO.now(i * 2)
We introduced a ServiceTimeout
error which tells the users that their requests will be accepted after it passes.
It's easy to forget to update callNumberService
to support the new behavior and if we didn't have case other => ...
then we would end up with errors at runtime.
The method callNumberService
would also compile if we changed error class of ForbiddenNumber()
leading to more issues.
Now let's see how it would look like if we leverage IO
capabilities:
import monix.bio.{IO, UIO}
case class ForbiddenNumber()
def numberService(i: Int): IO[ForbiddenNumber, Int] =
if (i == 0) IO.raiseError(ForbiddenNumber())
else IO.now(i * 2)
def callNumberService(i: Int): UIO[Int] = numberService(i).onErrorHandleWith {
case ForbiddenNumber() => callNumberService(i + 1) // try with a different number
}
Now numberService
specifies possible errors in the type signature, so it is immediately apparent to us and the compiler what the expected failures are.
We don't need case other => ...
because there are no other possible errors and if they appear in the future the code will stop compiling.
As a nice bonus, callNumberService
returns UIO[Int]
(type alias of IO[Nothing, Int]
) which tells whoever uses callNumberService
that they don't have to expect any errors.
If we change numberService
errors then we will have to change the signature as well:
import monix.bio.{IO, UIO}
import scala.concurrent.duration._
sealed trait NumberServiceErrors
case class ForbiddenNumber() extends NumberServiceErrors
case class ServiceTimeout(duration: FiniteDuration) extends NumberServiceErrors
def numberService(i: Int): IO[NumberServiceErrors, Int] =
// Check if we should timeout the caller
if (i == 5) IO.raiseError(ServiceTimeout(10.second))
else if (i == 0) IO.raiseError(ForbiddenNumber())
else IO.now(i * 2)
def callNumberService(i: Int): UIO[Int] = numberService(i).onErrorHandleWith {
case ForbiddenNumber() => callNumberService(i + 1) // try with a different number
// will give a warning without this line!
case ServiceTimeout(timeout) => callNumberService(i).delayExecution(timeout)
}
If the application uses scalacOptions += "-Xfatal-warnings"
in build.sbt
we will get the following error if we forget to change callNumberService
:
Error:(29, 80) match may not be exhaustive.
It would fail on the following input: ServiceTimeout(_)
def callNumberService(i: Int): UIO[Int] = numberService(i).onErrorHandleWith {
A similar approach is often used with single parameter effects in combination with Either
or EitherT
, that is:
def numberService(i: Int): IO[Either[NumberServiceErrors, Int]]
IO
can fail with Throwable
(IO
's terminal error channel) and Either
can return Left
of any E
(IO
's typed error channel).
IO
forces this convention which makes it more convenient and safer to follow it but if you are familiar with IO
of Either
then the spirit is the same.
I recommend this article by John De Goes if you are interested in the original motivations behind the idea of embedding this pattern directly in the data type.
Producing a failed IO
An error can occur when an Exception
is thrown or we can construct it ourselves with dedicated builder methods.
IO.raiseError
Use IO.raiseError
if you already have an error value:
import monix.bio.IO
import monix.execution.Scheduler.Implicits.global
val error = "error"
val task: IO[String, Int] = IO.raiseError(error)
// Left("error")
task.attempt.runSyncUnsafe()
IO.terminate
IO.terminate
can raise a terminal error (second channel with Throwable
):
import monix.bio.IO
import monix.execution.Scheduler.Implicits.global
import monix.execution.exceptions.DummyException
val error = DummyException("error")
// It doesn't affect the signature
val task: IO[String, Int] = IO.terminate(error)
task.attempt.runSyncUnsafe()
//=> Exception in thread "main" monix.execution.exceptions.DummyException: error
Catching errors in IO.eval
IO.eval
(and IO.apply
) will catch any errors that are thrown in the method's body and expose them as typed errors:
import monix.bio.{IO, Task}
import monix.execution.Scheduler.Implicits.global
import monix.execution.exceptions.DummyException
val error = DummyException("error")
val task: Task[Int] = IO.eval { throw error }
// Left(DummyException("error"))
task.attempt.runSyncUnsafe()
Catching errors in IO.evalTotal
If we are sure that our side-effecting code won't have any surprises we can use IO.evalTotal
but if we are wrong, the error
will be caught in the internal error channel:
import monix.bio.{IO, UIO}
import monix.execution.Scheduler.Implicits.global
import monix.execution.exceptions.DummyException
val error = DummyException("error")
val task: UIO[Int] = IO.evalTotal { throw error }
task.attempt.runSyncUnsafe()
//=> Exception in thread "main" monix.execution.exceptions.DummyException: error
Other methods which return UIO
or use a generic E
(not fixed to Throwable
) like map
/ flatMap
will behave in the same way when throwing an exception.
Recovering from Errors
When IO
fails, it will skip all subsequent operations until the error is handled.
Typed and terminal errors are in different categories - handling typed errors will not do anything to unexpected errors but
error handling functions for terminal errors handle "normal" errors as well.
The section only covers the main error handling operators, refer to API Documentation for the full list.
Typed Errors
Exposing Errors
attempt
and materialize
take the error away and return it as a normal value:
final def attempt: UIO[Either[E, A]]
final def materialize(implicit ev: E <:< Throwable): UIO[Try[A]]
Note that the return type is UIO
indicating that there are no more expected errors to handle.
import monix.bio.{IO, UIO}
import monix.execution.Scheduler.Implicits.global
val error = "error"
val task: IO[String, Int] = IO.raiseError(error)
val attempted: UIO[Either[String, Int]] = task.attempt
// Left("error")
attempted.runSyncUnsafe()
It is common to use attempt
before runToFuture
or runSyncUnsafe
.
The typed error will be exposed as a Left
and the terminal error will result in a failed Future
or an exception thrown (in runSyncUnsafe
).
Both methods have corresponding reverse operations:
final def rethrow[E1 >: E, B](implicit ev: A <:< Either[E1, B]): IO[E1, B]
final def dematerialize[B](implicit evE: E <:< Nothing, evA: A <:< Try[B]): Task[B]
Example:
import monix.bio.IO
val error = "error"
// same as IO.raiseError
val task: IO[String, Int] = IO.raiseError(error).attempt.rethrow
onErrorHandle & onErrorHandleWith
IO.onErrorHandleWith
is an operation which takes a function, mapping possible exceptions to a desired fallback outcome, so we could do this:
import monix.bio.{IO, UIO}
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
case class TimeoutException()
val source: IO[TimeoutException, String] =
IO.evalTotal("Hello!")
.delayExecution(10.seconds)
.timeoutWith(3.seconds, TimeoutException())
val recovered: UIO[String] = source.onErrorHandleWith {
_: TimeoutException => IO.now("Recovered!")
}
recovered.attempt.runToFuture.foreach(println)
//=> Recovered!
IO.onErrorHandle
is a variant which takes a pure recovery function E => B
instead of an effectful E => IO[E1, B]
which could also fail.
redeem & redeemWith
IO.redeem
and IO.redeemWith
are a combination of map
+ onErrorHandle
and flatMap
+ onErrorHandleWith
respectively.
Conceptually, it is a fold
operation.
If task
is successful then:
task.redeemWith(fe, fb) <-> task.flatMap(fb)
And when task
is failed:
task.redeemWith(fe, fb) <-> task.onErrorHandleWith(fe)
Instead of:
import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global
val f1 = IO.raiseError(DummyException("boom"))
val f2 = IO(println("A"))
val task = f1
.attempt
.flatMap {
case Left(_) => IO(println("Recovered!"))
case Right(_) => f2
}
task.runSyncUnsafe()
//=> Recovered!
You can do this:
import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global
val f1 = IO.raiseError(DummyException("boom"))
val f2 = IO(println("A"))
val task = f1
.redeemWith(_ => IO(println("Recovered!")), _ => f2)
task.runSyncUnsafe()
//=> Recovered!
The latter will be more efficient in terms of memory allocations.
Terminal Errors
Terminal errors ignore all typed error handlers and can only be caught by more powerful methods.
The example below shows how redeemWith
does nothing to handle unexpected errors even if it uses the same type:
import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global
// Note IO.termiante instead of IO.raiseError
val f1 = IO.terminate(DummyException("boom"))
val f2 = IO(println("A"))
val task = f1
.redeemWith(_ => IO(println("Recovered!")), _ => f2)
task.runSyncUnsafe()
//=> Exception in thread "main" monix.execution.exceptions.DummyException: boom
There are special variants of redeem
and redeemWith
which are called redeemCause
and redeemCauseWith
respectively.
IO.redeemCause
takes a Cause[E] => B
function instead of E => B
to recover and
IO.redeemCauseWith
uses a Cause[E] => IO[E1, B]
.
Cause
is defined as follows:
sealed abstract class Cause[+E] extends Product with Serializable {
// few methods
}
object Cause {
final case class Error[+E](value: E) extends Cause[E]
final case class Termination(value: Throwable) extends Cause[Nothing]
}
Let's modify the previous example to use redeemCause
:
import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global
// Note IO.termiante instead of IO.raiseError
val f1 = IO.terminate(DummyException("boom"))
val f2 = IO(println("A"))
val task = f1
.redeemCauseWith(_ => IO(println("Recovered!")), _ => f2)
task.runSyncUnsafe()
//=> Recovered!
Basically it is a more powerful version which can access both error channels.
In your actual application you might find yourself using typed error handlers (onErrorHandle
, redeem
etc.) almost all of the time
and only use Cause
variants when absolutely necessary like at the edges of the application if you don't want to pass failed IO
/ Future
to your HTTP library.
Mapping Errors
mapError
IO.mapError
will not handle any error but it can transform it to something else.
It can be useful to convert an error from a smaller type to a bigger type:
import monix.bio.IO
import java.time.Instant
case class ErrorA(i: Int)
case class ErrorB(errA: ErrorA, createdAt: Instant)
val task1: IO[ErrorA, String] = IO.raiseError(ErrorA(10))
val task2: IO[ErrorB, String] = task1.mapError(errA => ErrorB(errA, Instant.now()))
tapError
IO.tapError
can peek at the error value and execute provided E => IO[E1, B]
function without handling the original error.
For instance, we might want to log the error without handling it:
import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global
val f1 = IO.raiseError(DummyException("boom"))
val f2 = IO(println("A"))
val task = f1
.tapError(e => IO(println("Incoming error: " + e)))
task.runSyncUnsafe()
//=> Incoming error: monix.execution.exceptions.DummyException: boom
//=> Exception in thread "main" monix.execution.exceptions.DummyException: boom
Moving errors from the typed error channel
If you are sure that your IO
shouldn't have any errors and if there are any they should shutdown the task as soon as possible
there is hideErrors
and hideErrorsWith
which will hide the error from the type signature and raise it as a terminal error.
import monix.bio.{IO, UIO}
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global
val task: UIO[Int] = IO
.raiseError(DummyException("boom!"))
.hideErrors
.map(_ => 10)
// Some(Failure(DummyException(boom!)))
task.runToFuture.value
If your E
is not Throwable
you can use hideErrorsWith
which takes a E => Throwable
function.
This method is handy if you are using generic Cats-Effect based libraries, for example:
import monix.bio.{IO, Task}
import monix.catnap.ConcurrentQueue
val queueExample: IO[Throwable, String] = for {
queue <- ConcurrentQueue[Task].bounded[String](10)
_ <- queue.offer("Message")
msg <- queue.poll
} yield msg
monix.catnap.ConcurrentQueue
works for a generic effect type (cats.effect.IO
, monix.eval.Task
, zio.ZIO
) but it is written
in terms of type classes which unfortunately don't support two channels of errors and fix everything as Throwable
.
These methods don't throw any errors so we can safely hide them and have our typed errors back:
import monix.bio.{Task, UIO}
import monix.catnap.ConcurrentQueue
val queueExample: UIO[String] = (for {
queue <- ConcurrentQueue[Task].bounded[String](10)
_ <- queue.offer("Message")
msg <- queue.poll
} yield msg).hideErrors
Restarting on Error
IO
type represents a specification of a computation so it can be freely restarted if we wish to do so.
There are few retry combinators available but in general it is quite simple to write a custom recursive function. For instance, retry with exponential backoff would look like as follows:
import monix.bio.IO
import scala.concurrent.duration._
def retryBackoff[E, A](source: IO[E, A],
maxRetries: Int, firstDelay: FiniteDuration): IO[E, A] = {
source.onErrorHandleWith { ex =>
if (maxRetries > 0)
// Recursive call, it's OK as Monix is stack-safe
retryBackoff(source, maxRetries - 1, firstDelay * 2)
.delayExecution(firstDelay)
else
IO.raiseError(ex)
}
}
In more complicated cases it's worth taking a look at cats-retry library and/or use a stream (e.g. fs2, Monix Observable) instead of recursive functions.
Reporting Uncaught Errors
Losing errors is unacceptable.
We can't always return them as a IO
result because sometimes the failure could happen concurrently and IO
could already be finished with a different value.
In this case the error is reported with Scheduler.reportFailure
which by default logs uncaught errors to System.err
:
import monix.bio.IO
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
// Ensures asynchronous execution, just to show
// that the action doesn't happen on the
// current thread
val task = IO(2).delayExecution(1.second)
task.runAsync { r =>
throw new IllegalStateException(r.toString)
}
// After 1 second, this will log the whole stack trace:
//=> java.lang.IllegalStateException: Right(2)
//=> ...
//=> at monix.bio.BiCallback$$anon$3.tryApply(BiCallback.scala:359)
//=> at monix.bio.BiCallback$$anon$3.apply(BiCallback.scala:352)
//=> at monix.bio.BiCallback$$anon$3.onSuccess(BiCallback.scala:345)
//=> at monix.bio.internal.TaskRunLoop$.startFull(TaskRunLoop.scala:213)
//=> at monix.bio.internal.TaskRestartCallback.syncOnSuccess(TaskRestartCallback.scala:125)
//=> at monix.bio.internal.TaskRestartCallback.onSuccess(TaskRestartCallback.scala:83)
//=> ....
We can customize the behavior to use anything we'd like:
import monix.bio.IO
import monix.execution.Scheduler
import monix.execution.UncaughtExceptionReporter
import scala.concurrent.duration._
val reporter = UncaughtExceptionReporter { ex =>
// our own fancy logger
println("Customized printing of uncaught exception: " + ex)
}
implicit val s = Scheduler(Scheduler.global, reporter)
val task = IO(2).delayExecution(1.second)
task.runAsync { r =>
throw new IllegalStateException(r.toString)
}
// After 1 second:
//=> Customized printing of uncaught exception: java.lang.IllegalStateException: Right(2)
Cats Instances
If you are a Cats user then IO
provides ApplicativeError
and
MonadError
instances.
If you import cats.syntax.monadError._
, cats.syntax.applicativeError
or just cats.syntax.all._
you will have access to all the methods provided by library.
The main gotcha is that anything requiring Sync
and above will only work for IO[Throwable, A]