Monix BIO

Monix BIO

  • API Docs
  • Documentation
  • GitHub

›Documentation

Documentation

  • Introduction
  • Getting Started
  • Creating IO
  • Executing IO
  • Error Handling
  • Resource Safety
  • Cats-Effect Integration
  • Asynchronous Stack Traces
  • Other Effects

Error Handling

When IO fails with an error it short-circuits the computation and returns the error as a result:

import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global

val fa = IO(println("A"))
val fb = IO.raiseError(DummyException("boom"))
val fc = IO(println("C"))

val task = fa.flatMap(_ => fb).flatMap(_ => fc)

task.runSyncUnsafe()
//=> A
//=> Exception in thread "main" monix.execution.exceptions.DummyException: boom

We can handle the error to prevent it with one of many available methods. For better discoverability, they are often prefixed with onError.

import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global

val fa = IO(println("A"))
val fb = IO.raiseError(DummyException("boom"))
val fc = IO(println("C"))

val task = fa
  .flatMap(_ => fb)
  .onErrorHandleWith(_ => IO(println("B recovered")))
  .flatMap(_ => fc)
  .onErrorHandleWith(_ => IO(println("C recovered")))

task.runSyncUnsafe()
//=> A
//=> B recovered
//=> C

Error Channels

Many applications divide errors into two types:

  • Recoverable errors which can be acted upon and often have a meaning in the business domain. Examples: Insufficient permissions for a given action, temporary network failure.
  • Non-Recoverable errors which are often fatal or it is not sensible to try to recover from them. Examples: StackOverflow, throwing an exception in a pure function (programmer's error).

IO[E, A] follow this pattern and can fail with two kind of errors:

  • Errors of type E which represents recoverable errors. Other common names are "typed" or "expected" errors.
  • Errors of type Throwable for non-recoverable errors. We call them "terminal" or "unexpected" errors. You might also see terminology like "defect" or "unchecked failure" in other libraries.

The general guideline is to use a typed error channel for errors that are expected to be handled or have a value for the caller. The internal channel (non-recoverable errors) should be used for errors which can't be handled properly or can only be handled somewhere deep downstream in a generic manner (let's say to return InternalServerError at the edges).

Non-recoverable errors are hidden in the internal error channel which has very few operators and is supposed to be used as rarely as possible. Most of these errors are outside of our control and ideally we don't have to burden our minds with it and use a smaller, more comprehensible errors' domain for the most part of the coding.

The number of possible recoverable errors is often limited and each of them could be handled in a specific way in the business logic. E can be any type which allows us to be very precise. For instance, we can choose E to be an ADT reflecting errors in our business domain or even use Nothing to show that we don't have to worry about recovering from any errors. Possible errors are provided in the type signature which serves as always up-to-date documentation and allows us to easily statically check if all errors are handled. Furthermore, if there is a change, and it introduces new errors, we might easily miss it.

Consider the following example:

import monix.bio.{IO, Task}
  
case class ForbiddenNumber() extends Exception

def numberService(i: Int): Task[Int] =
  if (i == 0) IO.raiseError(ForbiddenNumber())
  else IO.now(i * 2)

def callNumberService(i: Int): Task[Int] = numberService(i).onErrorHandleWith {
  case ForbiddenNumber() => callNumberService(i + 1) // try with a different number
  case other => IO.raiseError(other) // propagate error
}

When writing callNumberService method we have to check the implementation of numberService to see what kind of errors can we expect because the type signature only specifies Throwable - so it can be pretty much anything. On top of that, we might have to add case other => ... to be safe in case we missed any error and to make our pattern matching exhaustive.

At some point, the implementation might change:

import monix.bio.{IO, Task}
import scala.concurrent.duration._

sealed trait NumberServiceErrors extends Exception

case class ForbiddenNumber() extends NumberServiceErrors
  
case class ServiceTimeout(duration: FiniteDuration) extends NumberServiceErrors

def numberService(i: Int): Task[Int] =
  // Check if we should timeout the caller
  if (i == 5) IO.raiseError(ServiceTimeout(10.second))
  else if (i == 0) IO.raiseError(ForbiddenNumber())
  else IO.now(i * 2)

We introduced a ServiceTimeout error which tells the users that their requests will be accepted after it passes. It's easy to forget to update callNumberService to support the new behavior and if we didn't have case other => ... then we would end up with errors at runtime. The method callNumberService would also compile if we changed error class of ForbiddenNumber() leading to more issues.

Now let's see how it would look like if we leverage IO capabilities:

import monix.bio.{IO, UIO}
  
case class ForbiddenNumber()

def numberService(i: Int): IO[ForbiddenNumber, Int] =
  if (i == 0) IO.raiseError(ForbiddenNumber())
  else IO.now(i * 2)

def callNumberService(i: Int): UIO[Int] = numberService(i).onErrorHandleWith {
  case ForbiddenNumber() => callNumberService(i + 1) // try with a different number
}

Now numberService specifies possible errors in the type signature, so it is immediately apparent to us and the compiler what the expected failures are. We don't need case other => ... because there are no other possible errors and if they appear in the future the code will stop compiling. As a nice bonus, callNumberService returns UIO[Int] (type alias of IO[Nothing, Int]) which tells whoever uses callNumberService that they don't have to expect any errors.

If we change numberService errors then we will have to change the signature as well:

import monix.bio.{IO, UIO}  
import scala.concurrent.duration._

sealed trait NumberServiceErrors

case class ForbiddenNumber() extends NumberServiceErrors

case class ServiceTimeout(duration: FiniteDuration) extends NumberServiceErrors

def numberService(i: Int): IO[NumberServiceErrors, Int] =
  // Check if we should timeout the caller
  if (i == 5) IO.raiseError(ServiceTimeout(10.second))
  else if (i == 0) IO.raiseError(ForbiddenNumber())
  else IO.now(i * 2)

def callNumberService(i: Int): UIO[Int] = numberService(i).onErrorHandleWith {
  case ForbiddenNumber() => callNumberService(i + 1) // try with a different number
  // will give a warning without this line!
  case ServiceTimeout(timeout) => callNumberService(i).delayExecution(timeout)
}

If the application uses scalacOptions += "-Xfatal-warnings" in build.sbt we will get the following error if we forget to change callNumberService:

Error:(29, 80) match may not be exhaustive.
It would fail on the following input: ServiceTimeout(_)
  def callNumberService(i: Int): UIO[Int] = numberService(i).onErrorHandleWith {

A similar approach is often used with single parameter effects in combination with Either or EitherT, that is:

def numberService(i: Int): IO[Either[NumberServiceErrors, Int]]

IO can fail with Throwable (IO's terminal error channel) and Either can return Left of any E (IO's typed error channel). IO forces this convention which makes it more convenient and safer to follow it but if you are familiar with IO of Either then the spirit is the same.

I recommend this article by John De Goes if you are interested in the original motivations behind the idea of embedding this pattern directly in the data type.

Producing a failed IO

An error can occur when an Exception is thrown or we can construct it ourselves with dedicated builder methods.

IO.raiseError

Use IO.raiseError if you already have an error value:

import monix.bio.IO
import monix.execution.Scheduler.Implicits.global

val error = "error"
val task: IO[String, Int] = IO.raiseError(error)

// Left("error")
task.attempt.runSyncUnsafe()

IO.terminate

IO.terminate can raise a terminal error (second channel with Throwable):

import monix.bio.IO
import monix.execution.Scheduler.Implicits.global
import monix.execution.exceptions.DummyException

val error = DummyException("error")
// It doesn't affect the signature
val task: IO[String, Int] = IO.terminate(error)

task.attempt.runSyncUnsafe()
//=> Exception in thread "main" monix.execution.exceptions.DummyException: error

Catching errors in IO.eval

IO.eval (and IO.apply) will catch any errors that are thrown in the method's body and expose them as typed errors:

import monix.bio.{IO, Task}
import monix.execution.Scheduler.Implicits.global
import monix.execution.exceptions.DummyException

val error = DummyException("error")
val task: Task[Int] = IO.eval { throw error }

// Left(DummyException("error"))
task.attempt.runSyncUnsafe()

Catching errors in IO.evalTotal

If we are sure that our side-effecting code won't have any surprises we can use IO.evalTotal but if we are wrong, the error will be caught in the internal error channel:

import monix.bio.{IO, UIO}
import monix.execution.Scheduler.Implicits.global
import monix.execution.exceptions.DummyException

val error = DummyException("error")
val task: UIO[Int] = IO.evalTotal { throw error }

task.attempt.runSyncUnsafe()
//=> Exception in thread "main" monix.execution.exceptions.DummyException: error

Other methods which return UIO or use a generic E (not fixed to Throwable) like map / flatMap will behave in the same way when throwing an exception.

Recovering from Errors

When IO fails, it will skip all subsequent operations until the error is handled. Typed and terminal errors are in different categories - handling typed errors will not do anything to unexpected errors but error handling functions for terminal errors handle "normal" errors as well.

The section only covers the main error handling operators, refer to API Documentation for the full list.

Typed Errors

Exposing Errors

attempt and materialize take the error away and return it as a normal value:

final def attempt: UIO[Either[E, A]]
final def materialize(implicit ev: E <:< Throwable): UIO[Try[A]]

Note that the return type is UIO indicating that there are no more expected errors to handle.

import monix.bio.{IO, UIO}
import monix.execution.Scheduler.Implicits.global

val error = "error"
val task: IO[String, Int] = IO.raiseError(error)
val attempted: UIO[Either[String, Int]] = task.attempt

// Left("error")
attempted.runSyncUnsafe()

It is common to use attempt before runToFuture or runSyncUnsafe. The typed error will be exposed as a Left and the terminal error will result in a failed Future or an exception thrown (in runSyncUnsafe).

Both methods have corresponding reverse operations:

final def rethrow[E1 >: E, B](implicit ev: A <:< Either[E1, B]): IO[E1, B]
final def dematerialize[B](implicit evE: E <:< Nothing, evA: A <:< Try[B]): Task[B]

Example:

import monix.bio.IO

val error = "error"
// same as IO.raiseError
val task: IO[String, Int] = IO.raiseError(error).attempt.rethrow 

onErrorHandle & onErrorHandleWith

IO.onErrorHandleWith is an operation which takes a function, mapping possible exceptions to a desired fallback outcome, so we could do this:

import monix.bio.{IO, UIO}
import monix.execution.Scheduler.Implicits.global

import scala.concurrent.duration._

case class TimeoutException()

val source: IO[TimeoutException, String] =
  IO.evalTotal("Hello!")
    .delayExecution(10.seconds)
    .timeoutWith(3.seconds, TimeoutException())

val recovered: UIO[String] = source.onErrorHandleWith {
  _: TimeoutException => IO.now("Recovered!")
}

recovered.attempt.runToFuture.foreach(println)
//=> Recovered!

IO.onErrorHandle is a variant which takes a pure recovery function E => B instead of an effectful E => IO[E1, B] which could also fail.

redeem & redeemWith

IO.redeem and IO.redeemWith are a combination of map + onErrorHandle and flatMap + onErrorHandleWith respectively. Conceptually, it is a fold operation.

If task is successful then:

task.redeemWith(fe, fb) <-> task.flatMap(fb)

And when task is failed:

task.redeemWith(fe, fb) <-> task.onErrorHandleWith(fe)

Instead of:

import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global

val f1 = IO.raiseError(DummyException("boom"))
val f2 = IO(println("A"))

val task = f1
  .attempt
  .flatMap {
    case Left(_) => IO(println("Recovered!"))
    case Right(_) => f2
  }

task.runSyncUnsafe()
//=> Recovered!

You can do this:

import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global

val f1 = IO.raiseError(DummyException("boom"))
val f2 = IO(println("A"))

val task = f1
  .redeemWith(_ => IO(println("Recovered!")), _ => f2)

task.runSyncUnsafe()
//=> Recovered!

The latter will be more efficient in terms of memory allocations.

Terminal Errors

Terminal errors ignore all typed error handlers and can only be caught by more powerful methods.

The example below shows how redeemWith does nothing to handle unexpected errors even if it uses the same type:

import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global

// Note IO.termiante instead of IO.raiseError
val f1 = IO.terminate(DummyException("boom"))
val f2 = IO(println("A"))

val task = f1
  .redeemWith(_ => IO(println("Recovered!")), _ => f2)

task.runSyncUnsafe()
//=> Exception in thread "main" monix.execution.exceptions.DummyException: boom

There are special variants of redeem and redeemWith which are called redeemCause and redeemCauseWith respectively. IO.redeemCause takes a Cause[E] => B function instead of E => B to recover and IO.redeemCauseWith uses a Cause[E] => IO[E1, B].

Cause is defined as follows:

sealed abstract class Cause[+E] extends Product with Serializable {
  // few methods
}

object Cause {
  final case class Error[+E](value: E) extends Cause[E]

  final case class Termination(value: Throwable) extends Cause[Nothing]
}

Let's modify the previous example to use redeemCause:

import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global

// Note IO.termiante instead of IO.raiseError
val f1 = IO.terminate(DummyException("boom"))
val f2 = IO(println("A"))

val task = f1
  .redeemCauseWith(_ => IO(println("Recovered!")), _ => f2)

task.runSyncUnsafe()
//=> Recovered!

Basically it is a more powerful version which can access both error channels. In your actual application you might find yourself using typed error handlers (onErrorHandle, redeem etc.) almost all of the time and only use Cause variants when absolutely necessary like at the edges of the application if you don't want to pass failed IO / Future to your HTTP library.

Mapping Errors

mapError

IO.mapError will not handle any error but it can transform it to something else.

It can be useful to convert an error from a smaller type to a bigger type:

import monix.bio.IO
import java.time.Instant

case class ErrorA(i: Int)
case class ErrorB(errA: ErrorA, createdAt: Instant)

val task1: IO[ErrorA, String] = IO.raiseError(ErrorA(10))
val task2: IO[ErrorB, String] = task1.mapError(errA => ErrorB(errA, Instant.now()))

tapError

IO.tapError can peek at the error value and execute provided E => IO[E1, B] function without handling the original error.

For instance, we might want to log the error without handling it:

import monix.bio.IO
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global

val f1 = IO.raiseError(DummyException("boom"))
val f2 = IO(println("A"))

val task = f1
  .tapError(e => IO(println("Incoming error: " + e)))

task.runSyncUnsafe()
//=> Incoming error: monix.execution.exceptions.DummyException: boom
//=> Exception in thread "main" monix.execution.exceptions.DummyException: boom

Moving errors from the typed error channel

If you are sure that your IO shouldn't have any errors and if there are any they should shutdown the task as soon as possible there is hideErrors and hideErrorsWith which will hide the error from the type signature and raise it as a terminal error.

import monix.bio.{IO, UIO}
import monix.execution.exceptions.DummyException
import monix.execution.Scheduler.Implicits.global

val task: UIO[Int] = IO
  .raiseError(DummyException("boom!"))
  .hideErrors
  .map(_ => 10)

// Some(Failure(DummyException(boom!)))
task.runToFuture.value

If your E is not Throwable you can use hideErrorsWith which takes a E => Throwable function.

This method is handy if you are using generic Cats-Effect based libraries, for example:

import monix.bio.{IO, Task}
import monix.catnap.ConcurrentQueue

val queueExample: IO[Throwable, String] = for {
  queue <- ConcurrentQueue[Task].bounded[String](10)
  _ <- queue.offer("Message")
  msg <- queue.poll
} yield msg

monix.catnap.ConcurrentQueue works for a generic effect type (cats.effect.IO, monix.eval.Task, zio.ZIO) but it is written in terms of type classes which unfortunately don't support two channels of errors and fix everything as Throwable.

These methods don't throw any errors so we can safely hide them and have our typed errors back:

import monix.bio.{Task, UIO}
import monix.catnap.ConcurrentQueue

val queueExample: UIO[String] = (for {
  queue <- ConcurrentQueue[Task].bounded[String](10)
  _ <- queue.offer("Message")
  msg <- queue.poll
} yield msg).hideErrors

Restarting on Error

IO type represents a specification of a computation so it can be freely restarted if we wish to do so.

There are few retry combinators available but in general it is quite simple to write a custom recursive function. For instance, retry with exponential backoff would look like as follows:

import monix.bio.IO
import scala.concurrent.duration._

def retryBackoff[E, A](source: IO[E, A],
  maxRetries: Int, firstDelay: FiniteDuration): IO[E, A] = {

  source.onErrorHandleWith { ex =>
      if (maxRetries > 0)
        // Recursive call, it's OK as Monix is stack-safe
        retryBackoff(source, maxRetries - 1, firstDelay * 2)
          .delayExecution(firstDelay)
      else
        IO.raiseError(ex)
  }
}

In more complicated cases it's worth taking a look at cats-retry library and/or use a stream (e.g. fs2, Monix Observable) instead of recursive functions.

Reporting Uncaught Errors

Losing errors is unacceptable. We can't always return them as a IO result because sometimes the failure could happen concurrently and IO could already be finished with a different value. In this case the error is reported with Scheduler.reportFailure which by default logs uncaught errors to System.err:

import monix.bio.IO
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

// Ensures asynchronous execution, just to show
// that the action doesn't happen on the
// current thread
val task = IO(2).delayExecution(1.second)

task.runAsync { r =>
  throw new IllegalStateException(r.toString)
}

// After 1 second, this will log the whole stack trace:
//=> java.lang.IllegalStateException: Right(2)
//=>    ...
//=> at monix.bio.BiCallback$$anon$3.tryApply(BiCallback.scala:359)
//=> at monix.bio.BiCallback$$anon$3.apply(BiCallback.scala:352)
//=> at monix.bio.BiCallback$$anon$3.onSuccess(BiCallback.scala:345)
//=> at monix.bio.internal.TaskRunLoop$.startFull(TaskRunLoop.scala:213)
//=> at monix.bio.internal.TaskRestartCallback.syncOnSuccess(TaskRestartCallback.scala:125)
//=> at monix.bio.internal.TaskRestartCallback.onSuccess(TaskRestartCallback.scala:83)
//=> ....

We can customize the behavior to use anything we'd like:

import monix.bio.IO
import monix.execution.Scheduler
import monix.execution.UncaughtExceptionReporter
import scala.concurrent.duration._

val reporter = UncaughtExceptionReporter { ex =>
  // our own fancy logger
  println("Customized printing of uncaught exception: " + ex)
}

implicit val s = Scheduler(Scheduler.global, reporter)

val task = IO(2).delayExecution(1.second)

task.runAsync { r =>
  throw new IllegalStateException(r.toString)
}

// After 1 second:
//=> Customized printing of uncaught exception: java.lang.IllegalStateException: Right(2)

Cats Instances

If you are a Cats user then IO provides ApplicativeError and MonadError instances.

If you import cats.syntax.monadError._, cats.syntax.applicativeError or just cats.syntax.all._ you will have access to all the methods provided by library.

The main gotcha is that anything requiring Sync and above will only work for IO[Throwable, A]

← Executing IOResource Safety →
  • Error Channels
  • Producing a failed IO
    • IO.raiseError
    • IO.terminate
    • Catching errors in IO.eval
    • Catching errors in IO.evalTotal
  • Recovering from Errors
    • Typed Errors
    • Terminal Errors
  • Mapping Errors
    • mapError
    • tapError
    • Moving errors from the typed error channel
  • Restarting on Error
  • Reporting Uncaught Errors
  • Cats Instances

Copyright © 2019-2021 The Monix Project Developers.