Resource Safety
Many tasks use some kind of resource, such as a connection pool, a file handle, or a socket. It's crucial to close them when we finish using them, otherwise we end up with resource leaks.
IO
provides ways to do it safely and goes beyond the capabilities of a try-catch-finally
block.
Running finalizer
guarantee
ensures that we run the provided finalizer regardless of the exit condition, be it successful completion, failure, or cancellation.
guaranteeCase
is a variant which takes ExitCase[Cause[E]] => UIO[Unit]
and can distinguish between different exit conditions.
import cats.effect.ExitCase
import monix.bio.{IO, UIO}
import monix.execution.Scheduler.Implicits.global
val slowerTask = IO.never.guaranteeCase {
case ExitCase.Completed => UIO(println("Successful completion"))
case ExitCase.Error(e) => UIO(println(s"Encountered an error: $e"))
case ExitCase.Canceled => UIO(println("Task has been cancelled"))
}
val fasterTask = UIO.evalAsync(10)
IO.race(slowerTask, fasterTask).runSyncUnsafe()
//=> Task has been cancelled
IO.race
cancels the slower task which executes the corresponding finalizer.
Safe acquisition and release
bracket
is a more general operator for the try-with-resources
pattern, but it works for pure effect types like IO
,
and supports concurrency and cancellation.
import java.io._
import monix.bio.{Task, UIO}
def readFirstLine(file: File): Task[String] = {
val acquire = Task(new BufferedReader(new FileReader(file)))
// Usage (the try block)
val use: BufferedReader => Task[String] = in => Task(in.readLine())
// Releasing the reader (the finally block)
val release: BufferedReader => UIO[Unit] = in => Task(in.close()).onErrorHandle(_ => ())
acquire.bracket(use)(release)
}
If Task
is successful, the result will be signaled in use
.
Note that release
expects UIO
- you can ignore any errors, or raise them as terminal errors but make sure to consider what should happen if the finalizer fails.
Don't leak any resources!
Similarly to guarantee
, there is also a bracketCase
variant.
Let's use it to only close the BufferedReader
on cancelation:
import java.io._
import cats.effect.ExitCase
import monix.bio.{Task, UIO}
import monix.bio.Cause
def readFirstLine(file: File): Task[String] = {
val acquire = Task(new BufferedReader(new FileReader(file)))
// Usage (the try block)
val use: BufferedReader => Task[String] = in => Task(in.readLine())
// Releasing the reader (the finally block)
val release: (BufferedReader, ExitCase[Cause[Throwable]]) => UIO[Unit] = {
case (_, ExitCase.Error(_) | ExitCase.Completed) => UIO.unit
case (in, ExitCase.Canceled) => Task(in.close()).onErrorHandle(_ => ())
}
acquire.bracketCase(use)(release)
}
cats.effect.Resource
Resource is compatible with IO
.
It allows us to define a task which has already specified acquire
and release
logic and clearly informs the users that they are not responsible for closing the resource.
Let's take a look at the example from the Cats-Effect docs, but written in IO
:
import cats.effect.Resource
import monix.bio.{UIO, Task}
import monix.execution.Scheduler.Implicits.global
val acquire: UIO[String] = UIO(println("Acquire cats...")) >> UIO("cats")
val release: String => UIO[Unit] = _ => UIO(println("...release everything"))
val addDogs: String => UIO[String] = x =>
UIO(println("...more animals...")) >> UIO.pure(x ++ " and dogs")
val report: String => Task[String] = x =>
UIO(println("...produce weather report...")) >> UIO("It's raining " ++ x)
Resource.make(acquire)(release).evalMap(addDogs).use(report).runSyncUnsafe()
//=> Acquire cats...
//=> ...more animals...
//=> ...produce weather report...
//=> ...release everything
// Returns "It's raining cats and dogs"
The main drawback is that Resource.use
requires the error type to be Throwable
but we can work with typed errors for acquire
and release
.
Semantics
Error during finalizers
If release
fails, the error is signalled as a terminal failure:
import monix.bio.{IO, UIO}
import monix.execution.Scheduler.Implicits.global
import monix.execution.exceptions.DummyException
val acquire = UIO("resource")
val use: String => IO[String, String] = s => IO.now(s)
val release: String => UIO[Unit] = _ => UIO (throw DummyException("unexpected error"))
val task: IO[String, String] = acquire.bracket(use)(release)
val result: Either[String, String] = task.attempt.runSyncUnsafe()
//=> Exception in thread "main" monix.execution.exceptions.DummyException: unexpected error
If both use
and release
fail, the errors are merged via Platform.composeErrors
and signalled as a terminal failure:
import monix.bio.{IO, UIO}
import monix.execution.Scheduler.Implicits.global
import monix.execution.exceptions.DummyException
val acquire = UIO("resource")
val use: String => IO[String, String] = s => IO.raiseError(s"I don't like $s")
val release: String => UIO[Unit] = _ => UIO (throw DummyException("unexpected error"))
val task: IO[String, String] = acquire.bracket(use)(release)
val result: Either[String, String] = task.attempt.runSyncUnsafe()
//=> Exception in thread "main" monix.execution.exceptions.UncaughtErrorException(I don't like resource)
//=> ...
//=> Suppressed: monix.execution.exceptions.DummyException: unexpected error
Nesting of finalizers
It is allowed to nest, or install multiple finalizers:
import monix.bio.UIO
import monix.execution.Scheduler.Implicits.global
UIO(println("action"))
.guarantee(
UIO(println("finalizer A")).guarantee(UIO(println("finalizer B")))
)
.guarantee(UIO(println("finalizer C")))
.runSyncUnsafe()
//=> action
//=> finalizer A
//=> finalizer B
//=> finalizer C
All finalizers will be executed even if any of them fail:
import monix.bio.UIO
import monix.execution.Scheduler.Implicits.global
import monix.execution.exceptions.DummyException
UIO(println("action"))
.guarantee(UIO(println("finalizer A")))
.guarantee(UIO(throw DummyException("dummy")))
.guarantee(UIO(println("finalizer C")))
.runSyncUnsafe()
//=> action
//=> finalizer A
//=> finalizer C
//=> Exception in thread "main" monix.execution.exceptions.DummyException: dummy
Ordering guarantees in the face of cancellation
When IO
is canceled, the cancel
method will not return until all finalizers have executed:
import monix.bio.{IO, UIO}
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val taskA = UIO(println("finished A"))
.delayExecution(50.millis)
.guarantee(UIO(println("finalized A")).delayExecution(100.millis))
val taskB = UIO(println("finished B"))
IO.race(taskA, taskB).flatMap(_ => UIO(println("Race Over"))).runSyncUnsafe()
//=> finished B
//=> finalized A
//=> Race Over
Corner case: cancel
will not back-pressure on finalizers if source IO
is set to be uncancelable
.
It is unfortunate outcome of the current implementation details. You can expect improvements in this area with Cats-Effect 3 support.