Packages

sealed abstract class IO[+E, +A] extends Serializable

Task represents a specification for a possibly lazy or asynchronous computation, which when executed will produce an A as a result, along with possible side-effects.

Compared with Future from Scala's standard library, Task does not represent a running computation or a value detached from time, as Task does not execute anything when working with its builders or operators and it does not submit any work into any thread-pool, the execution eventually taking place only after runAsync is called and not before that.

Note that Task is conservative in how it spawns logical threads. Transformations like map and flatMap for example will default to being executed on the logical thread on which the asynchronous computation was started. But one shouldn't make assumptions about how things will end up executed, as ultimately it is the implementation's job to decide on the best execution model. All you are guaranteed is asynchronous execution after executing runAsync.

Getting Started

To build a IO from a by-name parameters (thunks), we can use IO.apply ( alias IO.eval), monix.bio.IO.evalTotal if the thunk is guaranteed to not throw any exceptions, or IO.evalAsync:

val hello = IO("Hello ")
val world = IO.evalAsync("World!")

Nothing gets executed yet, as IO is lazy, nothing executes until you trigger its evaluation via runAsync or runToFuture.

To combine IO values we can use .map and .flatMap, which describe sequencing and this time it's in a very real sense because of the laziness involved:

val sayHello = hello
  .flatMap(h => world.map(w => h + w))
  .map(println)

This IO reference will trigger a side effect on evaluation, but not yet. To make the above print its message:

import monix.execution.CancelableFuture
import monix.execution.Scheduler.Implicits.global

val f = sayHello.runToFuture
// => Hello World!

The returned type is a CancelableFuture which inherits from Scala's standard Future, a value that can be completed already or might be completed at some point in the future, once the running asynchronous process finishes. Such a future value can also be canceled, see below.

Laziness, Purity and Referential Transparency

The fact that Task is lazy whereas Future is not has real consequences. For example with Task you can do this:

import scala.concurrent.duration._

def retryOnFailure[A](times: Int, source: Task[A]): Task[A] =
  source.onErrorHandleWith { err =>
    // No more retries left? Re-throw error:
    if (times <= 0) Task.raiseError(err) else {
      // Recursive call, yes we can!
      retryOnFailure(times - 1, source)
        // Adding 500 ms delay for good measure
        .delayExecution(500.millis)
    }
  }

Future being a strict value-wannabe means that the actual value gets "memoized" (means cached), however Task is basically a function that can be repeated for as many times as you want.

Task is a pure data structure that can be used to describe pure functions, the equivalent of Haskell's IO.

Memoization

Task can also do memoization, making it behave like a "lazy" Scala Future, meaning that nothing is started yet, its side effects being evaluated on the first runAsync and then the result reused on subsequent evaluations:

Task(println("boo")).memoize

The difference between this and just calling runAsync() is that memoize() still returns a Task and the actual memoization happens on the first runAsync() (with idempotency guarantees of course).

But here's something else that the Future data type cannot do, memoizeOnSuccess:

Task.eval {
  if (scala.util.Random.nextDouble() > 0.33)
    throw new RuntimeException("error!")
  println("moo")
}.memoizeOnSuccess

This keeps repeating the computation for as long as the result is a failure and caches it only on success. Yes we can!

WARNING: as awesome as memoize can be, use with care because memoization can break referential transparency!

Parallelism

Because of laziness, invoking IO.sequence will not work like it does for Future.sequence, the given Task values being evaluated one after another, in sequence, not in parallel. If you want parallelism, then you need to use IO.parSequence and thus be explicit about it.

This is great because it gives you the possibility of fine tuning the execution. For example, say you want to execute things in parallel, but with a maximum limit of 30 tasks being executed in parallel. One way of doing that is to process your list in batches:

// Some array of tasks, you come up with something good :-)
val list: Seq[Task[Int]] = Seq.tabulate(100)(Task(_))

// Split our list in chunks of 30 items per chunk,
// this being the maximum parallelism allowed
val chunks = list.sliding(30, 30).toSeq

// Specify that each batch should process stuff in parallel
val batchedTasks = chunks.map(chunk => Task.parSequence(chunk))
// Sequence the batches
val allBatches = Task.sequence(batchedTasks)

// Flatten the result, within the context of Task
val all: Task[Seq[Int]] = allBatches.map(_.flatten)

Note that the built Task reference is just a specification at this point, or you can view it as a function, as nothing has executed yet, you need to call runAsync or runToFuture explicitly.

Cancellation

The logic described by an Task task could be cancelable, depending on how the Task gets built.

CancelableFuture references can also be canceled, in case the described computation can be canceled. When describing Task tasks with Task.eval nothing can be cancelled, since there's nothing about a plain function that you can cancel, but we can build cancelable tasks with IO.cancelable.

import scala.concurrent.duration._
import scala.util._

val delayedHello = Task.cancelable0[Unit] { (scheduler, callback) =>
  val task = scheduler.scheduleOnce(1.second) {
    println("Delayed Hello!")
    // Signaling successful completion
    callback(Success(()))
  }
  // Returning a cancel token that knows how to cancel the
  // scheduled computation:
  Task {
    println("Cancelling!")
    task.cancel()
  }
}

The sample above prints a message with a delay, where the delay itself is scheduled with the injected Scheduler. The Scheduler is in fact an implicit parameter to runAsync().

This action can be cancelled, because it specifies cancellation logic. In case we have no cancelable logic to express, then it's OK if we returned a Cancelable.empty reference, in which case the resulting Task would not be cancelable.

But the Task we just described is cancelable, for one at the edge, due to runAsync returning Cancelable and CancelableFuture references:

// Triggering execution
val cf = delayedHello.runToFuture

// If we change our mind before the timespan has passed:
cf.cancel()

But also cancellation is described on Task as a pure action, which can be used for example in race conditions:

import scala.concurrent.duration._
import scala.concurrent.TimeoutException

val ta = Task(1 + 1).delayExecution(4.seconds)

val tb = Task.raiseError[Int](new TimeoutException)
  .delayExecution(4.seconds)

Task.racePair(ta, tb).flatMap {
  case Left((a, fiberB)) =>
    fiberB.cancel.map(_ => a)
  case Right((fiberA, b)) =>
    fiberA.cancel.map(_ => b)
}

The returned type in racePair is Fiber, which is a data type that's meant to wrap tasks linked to an active process and that can be canceled or joined.

Also, given a task, we can specify actions that need to be triggered in case of cancellation, see doOnCancel:

val task = Task.eval(println("Hello!")).executeAsync

task doOnCancel IO.evalTotal {
  println("A cancellation attempt was made!")
}

Given a task, we can also create a new task from it that atomic (non cancelable), in the sense that either all of it executes or nothing at all, via uncancelable.

Note on the ExecutionModel

Task is conservative in how it introduces async boundaries. Transformations like map and flatMap for example will default to being executed on the current call stack on which the asynchronous computation was started. But one shouldn't make assumptions about how things will end up executed, as ultimately it is the implementation's job to decide on the best execution model. All you are guaranteed (and can assume) is asynchronous execution after executing runAsync.

Currently the default ExecutionModel specifies batched execution by default and Task in its evaluation respects the injected ExecutionModel. If you want a different behavior, you need to execute the Task reference with a different scheduler.

Source
IO.scala
Linear Supertypes
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. IO
  2. Serializable
  3. AnyRef
  4. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. Protected

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##: Int
    Definition Classes
    AnyRef → Any
  3. final def *>[E1 >: E, B](tb: IO[E1, B]): IO[E1, B]

    Runs this IO first and then, when successful, the given IO.

    Runs this IO first and then, when successful, the given IO. Returns the result of the given IO.

    Example:

    val combined = IO{println("first"); "first"} *> IO{println("second"); "second"}
    // Prints "first" and then "second"
    // Result value will be "second"

    As this method is strict, it can lead to an infinite loop / stack overflow for self-referring tasks.

    Annotations
    @inline()
    See also

    >> for the version with a non-strict parameter

  4. final def <*[E1 >: E, B](tb: IO[E1, B]): IO[E1, A]

    Runs this IO first and then, when successful, the given IO.

    Runs this IO first and then, when successful, the given IO. Returns the result of this IO.

    Example:

    val combined = IO{println("first"); "first"} <* IO{println("second"); "second"}
    // Prints "first" and then "second"
    // Result value will be "first"

    As this method is strict, it can lead to an infinite loop / stack overflow for self-referring tasks.

    Annotations
    @inline()
  5. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  6. final def >>[E1 >: E, B](tb: => IO[E1, B]): IO[E1, B]

    Runs this task first and then, when successful, the given task.

    Runs this task first and then, when successful, the given task. Returns the result of the given task.

    Example:

    val combined = Task{println("first"); "first"} >> Task{println("second"); "second"}
    // Prints "first" and then "second"
    // Result value will be "second"
  7. final def absorb(implicit ev: <:<[E, Throwable]): Task[A]

    Absorbs all unexpected errors to typed error channel.

    Absorbs all unexpected errors to typed error channel.

    import monix.execution.exceptions.DummyException
    import monix.execution.Scheduler.Implicits.global
    
    val task: UIO[Either[Throwable, Int]] = IO
      .terminate(DummyException("boom!"))
      .absorb
      .attempt
    
      // Some(Success(Left(DummyException(boom!))))
      task.runToFuture.value
  8. final def absorbWith(f: (E) => Throwable): Task[A]

    Absorbs all unexpected errors to typed error channel.

    Absorbs all unexpected errors to typed error channel.

    import monix.execution.exceptions.DummyException
    import monix.execution.Scheduler.Implicits.global
    
    val task: UIO[Either[Throwable, Int]] = IO
      .raiseError("boom!")
      .absorbWith(e => DummyException(e))
      .attempt
    
      // Some(Success(Left(DummyException(boom!))))
      task.runToFuture.value
  9. final def as[B](value: B): IO[E, B]

    Replaces the A value in IO[E, A] with the supplied value.

  10. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  11. final def asyncBoundary(s: Scheduler): IO[E, A]

    Introduces an asynchronous boundary at the current stage in the asynchronous processing pipeline, making processing to jump on the given Scheduler (until the next async boundary).

    Introduces an asynchronous boundary at the current stage in the asynchronous processing pipeline, making processing to jump on the given Scheduler (until the next async boundary).

    Consider the following example:

    import monix.execution.Scheduler
    
    implicit val s = Scheduler.global
    val io = Scheduler.io()
    
    val source = IO(1) // s
      .asyncBoundary(io)
      .flatMap(_ => IO(2)) // io
      .flatMap(_ => IO(3)) // io
      .asyncBoundary
      .flatMap(_ => IO(4))  // s

    If Scheduler s is passed implicitly when running the IO, IO(1) will be executed there. Then it will switch to io for IO(2) and IO(3). asyncBoundary without any arguments returns to the default Scheduler so IO(4) will be executed there.

    s

    is the scheduler triggering the asynchronous boundary

  12. final def asyncBoundary: IO[E, A]

    Introduces an asynchronous boundary at the current stage in the asynchronous processing pipeline.

    Introduces an asynchronous boundary at the current stage in the asynchronous processing pipeline.

    The IO will be returned to the default Scheduler to reschedule the rest of its execution.

    Consider the following example:

    import monix.execution.ExecutionModel.SynchronousExecution
    import monix.execution.Scheduler
    
    val s = Scheduler.singleThread("example-scheduler").withExecutionModel(SynchronousExecution)
    
    val source1 = IO(println("task 1")).loopForever
    val source2 = IO(println("task 2")).loopForever
    
    // Will keep printing only "task 1" or "task 2"
    // depending on which one was scheduled first
    IO.parZip2(source1, source2)

    We might expect that both source1 and source2 would execute concurrently but since we are using only 1 thread with SynchronousExecution execution model, one of them will be scheduled first and then run forever.

    To prevent this behavior we could introduce asynchronous boundary after each iteration, i.e.:

    val source3 = IO(println("task 1")).asyncBoundary.loopForever
    val source4 = IO(println("task 2")).asyncBoundary.loopForever
    
    // Will keep printing "task 1" and "task 2" alternately.
    IO.parZip2(source3, source4)

    A lot of asynchronous boundaries can lead to unnecessary overhead so in the majority of cases it is enough to use the default ExecutionModel which introduces asynchronous boundaries between flatMap periodically on its own.

    Likelihood that different tasks are able to advance is called fairness.

    See also

    IO.executeOn for a way to override the default Scheduler

  13. final def attempt: UIO[Either[E, A]]

    Creates a new IO that will expose any triggered typed errors from the source.

  14. final def bimap[E1, B](fe: (E) => E1, fa: (A) => B): IO[E1, B]

    Returns a new IO that applies the mapping function fa to the success channel and fe to the error channel.

  15. final def bracket[E1 >: E, B](use: (A) => IO[E1, B])(release: (A) => UIO[Unit]): IO[E1, B]

    Returns a task that treats the source task as the acquisition of a resource, which is then exploited by the use function and then released.

    Returns a task that treats the source task as the acquisition of a resource, which is then exploited by the use function and then released.

    The bracket operation is the equivalent of the try {} catch {} finally {} statements from mainstream languages.

    The bracket operation installs the necessary exception handler to release the resource in the event of an exception being raised during the computation, or in case of cancellation.

    If an exception is raised, then bracket will re-raise the exception after performing the release. If the resulting task gets cancelled, then bracket will still perform the release, but the yielded task will be non-terminating (equivalent with IO.never).

    Example:

    import java.io._
    
    def readFile(file: File): Task[String] = {
      // Opening a file handle for reading text
      val acquire = Task.eval(new BufferedReader(
        new InputStreamReader(new FileInputStream(file), "utf-8")
      ))
    
      acquire.bracket { in =>
        // Usage part
        Task.eval {
          // Yes, ugly Java, non-FP loop;
          // side-effects are suspended though
          var line: String = null
          val buff = new StringBuilder()
          while (line != null) {
            line = in.readLine()
            if (line != null) buff.append(line)
          }
          buff.toString()
        }
      } { in =>
        // The release part
        UIO(in.close())
      }
    }

    Note that in case of cancellation the underlying implementation cannot guarantee that the computation described by use doesn't end up executed concurrently with the computation from release. In the example above that ugly Java loop might end up reading from a BufferedReader that is already closed due to the task being cancelled, thus triggering an error in the background with nowhere to go but in Scheduler.reportFailure.

    In this particular example, given that we are just reading from a file, it doesn't matter. But in other cases it might matter, as concurrency on top of the JVM when dealing with I/O might lead to corrupted data.

    For those cases you might want to do synchronization (e.g. usage of locks and semaphores) and you might want to use bracketE, the version that allows you to differentiate between normal termination and cancellation.

    NOTE on error handling: one big difference versus try {} finally {} is that, in case both the release function and the use function throws, the error raised by use gets signaled and the error raised by release gets reported with Scheduler.reportFailure.

    For example:

    IO.evalAsync("resource").bracket { _ =>
      // use
      IO.raiseError(new RuntimeException("Foo"))
    } { _ =>
      // release
      IO.terminate(new RuntimeException("Bar"))
    }

    In this case the error signaled downstream is "Foo", while the "Bar" error gets reported. This is consistent with the behavior of Haskell's bracket operation and NOT with try {} finally {} from Scala, Java or JavaScript.

    use

    is a function that evaluates the resource yielded by the source, yielding a result that will get generated by the task returned by this bracket function

    release

    is a function that gets called after use terminates, either normally or in error, or if it gets cancelled, receiving as input the resource that needs to be released

    See also

    bracketCase and bracketE

  16. final def bracketCase[E1 >: E, B](use: (A) => IO[E1, B])(release: (A, ExitCase[Cause[E1]]) => UIO[Unit]): IO[E1, B]

    Returns a new task that treats the source task as the acquisition of a resource, which is then exploited by the use function and then released, with the possibility of distinguishing between normal termination and cancelation, such that an appropriate release of resources can be executed.

    Returns a new task that treats the source task as the acquisition of a resource, which is then exploited by the use function and then released, with the possibility of distinguishing between normal termination and cancelation, such that an appropriate release of resources can be executed.

    The bracketCase operation is the equivalent of try {} catch {} finally {} statements from mainstream languages when used for the acquisition and release of resources.

    The bracketCase operation installs the necessary exception handler to release the resource in the event of an exception being raised during the computation, or in case of cancelation.

    In comparison with the simpler bracket version, this one allows the caller to differentiate between normal termination, termination in error and cancelation via an ExitCase parameter.

    use

    is a function that evaluates the resource yielded by the source, yielding a result that will get generated by this function on evaluation

    release

    is a function that gets called after use terminates, either normally or in error, or if it gets canceled, receiving as input the resource that needs that needs release, along with the result of use (cancelation, error or successful result)

    See also

    bracket and bracketE

  17. final def bracketE[E1 >: E, B](use: (A) => IO[E1, B])(release: (A, Either[Option[Cause[E1]], B]) => UIO[Unit]): IO[E1, B]

    Returns a task that treats the source task as the acquisition of a resource, which is then exploited by the use function and then released, with the possibility of distinguishing between normal termination and cancellation, such that an appropriate release of resources can be executed.

    Returns a task that treats the source task as the acquisition of a resource, which is then exploited by the use function and then released, with the possibility of distinguishing between normal termination and cancellation, such that an appropriate release of resources can be executed.

    The bracketE operation is the equivalent of try {} catch {} finally {} statements from mainstream languages.

    The bracketE operation installs the necessary exception handler to release the resource in the event of an exception being raised during the computation, or in case of cancellation.

    In comparison with the simpler bracket version, this one allows the caller to differentiate between normal termination and cancellation.

    The release function receives as input:

    • Left(None) in case of cancellation
    • Left(Some(error)) in case use terminated with an error
    • Right(b) in case of success

    NOTE on error handling: one big difference versus try {} finally {} is that, in case both the release function and the use function throws, the error raised by use gets signaled and the error raised by release gets reported with Scheduler.reportFailure.

    For example:

    IO.evalAsync("resource").bracket { _ =>
      // use
      IO.raiseError(new RuntimeException("Foo"))
    } { _ =>
      // release
      IO.terminate(new RuntimeException("Bar"))
    }

    In this case the error signaled downstream is "Foo", while the "Bar" error gets reported. This is consistent with the behavior of Haskell's bracket operation and NOT with try {} finally {} from Scala, Java or JavaScript.

    use

    is a function that evaluates the resource yielded by the source, yielding a result that will get generated by this function on evaluation

    release

    is a function that gets called after use terminates, either normally or in error, or if it gets cancelled, receiving as input the resource that needs that needs release, along with the result of use (cancellation, error or successful result)

    See also

    bracket and bracketCase

  18. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.CloneNotSupportedException]) @native()
  19. final def delayExecution(timespan: FiniteDuration): IO[E, A]

    Returns a task that waits for the specified timespan before executing and mirroring the result of the source.

    Returns a task that waits for the specified timespan before executing and mirroring the result of the source.

    In this example we're printing to standard output, but before doing that we're introducing a 3 seconds delay:

    import scala.concurrent.duration._
    
    Task(println("Hello!"))
      .delayExecution(3.seconds)

    This operation is also equivalent with:

    Task.sleep(3.seconds).flatMap(_ => Task(println("Hello!")))

    See IO.sleep for the operation that describes the effect and IO.delayResult for the version that evaluates the task on time, but delays the signaling of the result.

    timespan

    is the time span to wait before triggering the evaluation of the task

  20. final def delayResult(timespan: FiniteDuration): IO[E, A]

    Returns a task that executes the source immediately on runAsync, but before emitting the onSuccess result for the specified duration.

    Returns a task that executes the source immediately on runAsync, but before emitting the onSuccess result for the specified duration.

    Note that if an error happens, then it is streamed immediately with no delay.

    See delayExecution for delaying the evaluation of the task with the specified duration. The delayResult operation is effectively equivalent with:

    import scala.concurrent.duration._
    
    Task(1 + 1)
      .flatMap(a => Task.now(a).delayExecution(3.seconds))

    Or if we are to use the IO.sleep describing just the effect, this operation is equivalent with:

    Task(1 + 1).flatMap(a => Task.sleep(3.seconds).map(_ => a))

    Thus in this example 3 seconds will pass before the result is being generated by the source, plus another 5 seconds before it is finally emitted:

    Task(1 + 1)
      .delayExecution(3.seconds)
      .delayResult(5.seconds)
    timespan

    is the time span to sleep before signaling the result, but after the evaluation of the source

  21. final def dematerialize[B](implicit evE: <:<[E, Nothing], evA: <:<[A, Try[B]]): Task[B]

    Dematerializes the source's result from a Try.

  22. final def doOnCancel(callback: UIO[Unit]): IO[E, A]

    Returns a new Task that will mirror the source, but that will execute the given callback if the task gets canceled before completion.

    Returns a new Task that will mirror the source, but that will execute the given callback if the task gets canceled before completion.

    This only works for premature cancellation. See doOnFinish for triggering callbacks when the source finishes.

    callback

    is the callback to execute if the task gets canceled prematurely

  23. final def doOnFinish(f: (Option[Cause[E]]) => UIO[Unit]): IO[E, A]

    Returns a new IO in which f is scheduled to be run on completion.

    Returns a new IO in which f is scheduled to be run on completion. This would typically be used to release any resources acquired by this IO.

    The returned IO completes when both the source and the task returned by f complete.

    NOTE: The given function is only called when the task is complete. However the function does not get called if the task gets canceled. Cancellation is a process that's concurrent with the execution of a task and hence needs special handling.

    See doOnCancel for specifying a callback to call on canceling a task.

  24. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  25. def equals(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef → Any
  26. final def executeAsync: IO[E, A]

    Mirrors the given source Task, but upon execution ensure that evaluation forks into a separate (logical) thread.

    Mirrors the given source Task, but upon execution ensure that evaluation forks into a separate (logical) thread.

    The Scheduler used will be the one that is used to start the run-loop in IO.runAsync or IO.runToFuture.

    This operation is equivalent with:

    Task.shift.flatMap(_ => Task(1 + 1))
    
    // ... or ...
    
    import cats.syntax.all._
    
    Task.shift *> Task(1 + 1)

    The Scheduler used for scheduling the async boundary will be the default, meaning the one used to start the run-loop in runAsync.

  27. final def executeOn(s: Scheduler, forceAsync: Boolean = true): IO[E, A]

    Overrides the default Scheduler, possibly forcing an asynchronous boundary before execution (if forceAsync is set to true, the default).

    Overrides the default Scheduler, possibly forcing an asynchronous boundary before execution (if forceAsync is set to true, the default).

    When a Task is executed with IO.runAsync or IO.runToFuture, it needs a Scheduler, which is going to be injected in all asynchronous tasks processed within the flatMap chain, a Scheduler that is used to manage asynchronous boundaries and delayed execution.

    This scheduler passed in runAsync is said to be the "default" and executeOn overrides that default.

    import monix.execution.Scheduler
    import java.io.{BufferedReader, FileInputStream, InputStreamReader}
    
    /** Reads the contents of a file using blocking I/O. */
    def readFile(path: String): Task[String] = Task.eval {
      val in = new BufferedReader(
        new InputStreamReader(new FileInputStream(path), "utf-8"))
    
      val buffer = new StringBuffer()
      var line: String = null
      while (line != null) {
        line = in.readLine()
        if (line != null) buffer.append(line)
      }
    
      buffer.toString
    }
    
    // Building a Scheduler meant for blocking I/O
    val io = Scheduler.io()
    
    // Building the Task reference, specifying that `io` should be
    // injected as the Scheduler for managing async boundaries
    readFile("path/to/file").executeOn(io, forceAsync = true)

    In this example we are using IO.eval, which executes the given thunk immediately (on the current thread and call stack).

    By calling executeOn(io), we are ensuring that the used Scheduler (injected in IO.cancelable0) will be io, a Scheduler that we intend to use for blocking I/O actions. And we are also forcing an asynchronous boundary right before execution, by passing the forceAsync parameter as true (which happens to be the default value).

    Thus, for our described function that reads files using Java's blocking I/O APIs, we are ensuring that execution is entirely managed by an io scheduler, executing that logic on a thread pool meant for blocking I/O actions.

    Note that in case forceAsync = false, then the invocation will not introduce any async boundaries of its own and will not ensure that execution will actually happen on the given Scheduler, that depending of the implementation of the Task. For example:

    Task.eval("Hello, " + "World!")
      .executeOn(io, forceAsync = false)

    The evaluation of this task will probably happen immediately (depending on the configured ExecutionModel) and the given scheduler will probably not be used at all.

    However in case we would use IO.evalAsync, which ensures that execution of the provided thunk will be async, then by using executeOn we'll indeed get a logical fork on the io scheduler:

    IO.evalAsync("Hello, " + "World!").executeOn(io, forceAsync = false)

    Also note that overriding the "default" scheduler can only happen once, because it's only the "default" that can be overridden.

    Something like this won't have the desired effect:

    val io1 = Scheduler.io()
    val io2 = Scheduler.io()
    
    Task(1 + 1).executeOn(io1).executeOn(io2)

    In this example the implementation of task will receive the reference to io1 and will use it on evaluation, while the second invocation of executeOn will create an unnecessary async boundary (if forceAsync = true) or be basically a costly no-op. This might be confusing but consider the equivalence to these functions:

    import scala.concurrent.ExecutionContext
    
    val io11 = Scheduler.io()
    val io22 = Scheduler.io()
    
    def sayHello(ec: ExecutionContext): Unit =
      ec.execute(new Runnable {
        def run() = println("Hello!")
      })
    
    def sayHello2(ec: ExecutionContext): Unit =
      // Overriding the default `ec`!
      sayHello(io11)
    
    def sayHello3(ec: ExecutionContext): Unit =
      // Overriding the default no longer has the desired effect
      // because sayHello2 is ignoring it!
      sayHello2(io22)
    s

    is the Scheduler to use for overriding the default scheduler and for forcing an asynchronous boundary if forceAsync is true

    forceAsync

    indicates whether an asynchronous boundary should be forced right before the evaluation of the Task, managed by the provided Scheduler

    returns

    a new Task that mirrors the source on evaluation, but that uses the provided scheduler for overriding the default and possibly force an extra asynchronous boundary on execution

  28. final def executeWithModel(em: ExecutionModel): IO[E, A]

    Returns a new task that will execute the source with a different ExecutionModel.

    Returns a new task that will execute the source with a different ExecutionModel.

    This allows fine-tuning the options injected by the scheduler locally. Example:

    import monix.execution.ExecutionModel.AlwaysAsyncExecution
    Task(1 + 1).executeWithModel(AlwaysAsyncExecution)
    em

    is the ExecutionModel with which the source will get evaluated on runAsync

  29. final def executeWithOptions(f: (Options) => Options): IO[E, A]

    Returns a new task that will execute the source with a different set of Options.

    Returns a new task that will execute the source with a different set of Options.

    This allows fine-tuning the default options. Example:

    Task(1 + 1).executeWithOptions(_.enableAutoCancelableRunLoops)
    f

    is a function that takes the source's current set of options and returns a modified set of options that will be used to execute the source upon runAsync

  30. final def failed: UIO[E]

    Returns a failed projection of this task.

    Returns a failed projection of this task.

    The failed projection is a IO holding a value of type E, emitting the error yielded by the source, in case the source fails, otherwise if the source succeeds the result will fail with a NoSuchElementException.

  31. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.Throwable])
  32. final def flatMap[E1 >: E, B](f: (A) => IO[E1, B]): IO[E1, B]

    Creates a new Task by applying a function to the successful result of the source Task, and returns a task equivalent to the result of the function.

  33. final def flatMapLoop[E1 >: E, S](seed: S)(f: (A, S, (S) => IO[E1, S]) => IO[E1, S]): IO[E1, S]

    Describes flatMap-driven loops, as an alternative to recursive functions.

    Describes flatMap-driven loops, as an alternative to recursive functions.

    Sample:

    import scala.util.Random
    
    val random = IO(Random.nextInt())
    val loop = random.flatMapLoop(Vector.empty[Int]) { (a, list, continue) =>
      val newList = list :+ a
      if (newList.length < 5)
        continue(newList)
      else
        IO.now(newList)
    }
    seed

    initializes the result of the loop

    f

    is the function that updates the result on each iteration, returning a IO.

    returns

    a new IO that contains the result of the loop.

  34. final def flatten[E1 >: E, B](implicit ev: <:<[A, IO[E1, B]]): IO[E1, B]

    Given a source Task that emits another Task, this function flattens the result, returning a Task equivalent to the emitted Task by the source.

  35. final def flip: IO[A, E]

    Creates a new IO by swapping the error and value parameters.

    Creates a new IO by swapping the error and value parameters. This allows you to work with the error in a right-biased context, allowing you to apply a series of operations that may depend on the error thrown by this task.

    Example:

    import java.time.Instant
    
    case class ErrorA(i: Int)
    case class ErrorB(errA: ErrorA, createdAt: Instant)
    
    def mapToErrorB(error: ErrorA): Task[ErrorB] = ???
    def logToStdErr(error: ErrorB): Task[ErrorB] = ???
    def logErrorToFile(error: ErrorB): Task[ErrorB] = ???
    
    val f1 = IO.raiseError(ErrorA(500))
    
    for {
      errorA <- f1.flip
      errorB <- mapToErrorB(errorA)
      _      <- logToStdErr(errorB)
      _      <- logErrorToFile(errorB)
    } yield ()
  36. final def flipWith[E1, A1](f: (IO[A, E]) => IO[A1, E1]): IO[E1, A1]

    * This function implements a common pattern with flip in that it returns the already flipped task and allows applying a series of operations that may depend on the error thrown by this task, before flipping the error and value parameters back.

    * This function implements a common pattern with flip in that it returns the already flipped task and allows applying a series of operations that may depend on the error thrown by this task, before flipping the error and value parameters back.

    Example:

    import java.time.Instant
    
    case class ErrorA(i: Int)
    case class ErrorB(errA: ErrorA, createdAt: Instant)
    
    def mapToErrorB(error: ErrorA): Task[ErrorB] = ???
    def logToStdErr(error: ErrorB): Task[ErrorB] = ???
    def logErrorToFile(error: ErrorB): Task[ErrorB] = ???
    
    val f1 = IO.raiseError(ErrorA(500)).flipWith { f1 =>
      for {
        errorA <- f1
        errorB <- mapToErrorB(errorA)
        _      <- logToStdErr(errorB)
        _      <- logErrorToFile(errorB)
      } yield errorB
    }
  37. final def foreach(f: (Either[E, A]) => Unit)(implicit s: Scheduler): Unit

    Triggers the evaluation of the source, executing the given function for the generated element.

    Triggers the evaluation of the source, executing the given function for the generated element.

    The application of this function has strict behavior, as the task is immediately executed.

    Exceptions in f are reported using provided (implicit) Scheduler

    Annotations
    @UnsafeBecauseImpure()
  38. final def foreachL(f: (A) => Unit): IO[E, Unit]

    Returns a new task that upon evaluation will execute the given function for the generated element, transforming the source into a IO[E, Unit].

    Returns a new task that upon evaluation will execute the given function for the generated element, transforming the source into a IO[E, Unit].

    Similar in spirit with normal foreach, but lazy, as obviously nothing gets executed at this point.

  39. final def getClass(): Class[_ <: AnyRef]
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  40. final def guarantee(finalizer: UIO[Unit]): IO[E, A]

    Executes the given finalizer when the source is finished, either in success or in error, or if canceled.

    Executes the given finalizer when the source is finished, either in success or in error, or if canceled.

    This variant of guaranteeCase evaluates the given finalizer regardless of how the source gets terminated:

    • normal completion
    • completion in error
    • cancellation

    As best practice, it's not a good idea to release resources via guaranteeCase in polymorphic code. Prefer bracket for the acquisition and release of resources.

    See also

    guaranteeCase for the version that can discriminate between termination conditions

    bracket for the more general operation

  41. final def guaranteeCase(finalizer: (ExitCase[Cause[E]]) => UIO[Unit]): IO[E, A]

    Executes the given finalizer when the source is finished, either in success or in error, or if canceled, allowing for differentiating between exit conditions.

    Executes the given finalizer when the source is finished, either in success or in error, or if canceled, allowing for differentiating between exit conditions.

    This variant of guarantee injects an ExitCase in the provided function, allowing one to make a difference between:

    • normal completion
    • completion in error
    • cancellation

    As best practice, it's not a good idea to release resources via guaranteeCase in polymorphic code. Prefer bracketCase for the acquisition and release of resources.

    See also

    guarantee for the simpler version

    bracketCase for the more general operation

  42. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  43. final def hideErrors(implicit E: <:<[E, Throwable]): UIO[A]

    Hides all errors from the return type and raises them in the internal channel.

    Hides all errors from the return type and raises them in the internal channel.

    Use if you have a method that returns a possible error but you can't recover from it anyway and do not want to drag it everywhere.

    import monix.execution.exceptions.DummyException
    import monix.execution.Scheduler.Implicits.global
    
    val task: UIO[Int] = IO
      .raiseError(DummyException("boom!"))
      .hideErrors
      .map(_ => 10)
    
    // Some(Failure(DummyException(boom!)))
    task.runToFuture.value
  44. final def hideErrorsWith(f: (E) => Throwable): UIO[A]

    Hides all errors from the return type and raises them in the internal channel, using supplied function to transform E into Throwable.

    Hides all errors from the return type and raises them in the internal channel, using supplied function to transform E into Throwable.

    Use if you have a method that returns a possible error but you can't recover from it anyway and do not want to drag it everywhere.

    import monix.execution.exceptions.DummyException
    import monix.execution.Scheduler.Implicits.global
    
    val task: UIO[Int] = IO
      .raiseError("boom!")
      .hideErrorsWith(e => DummyException(e))
      .map(_ => 10)
    
    // Some(Failure(DummyException(boom!)))
    task.runToFuture.value
  45. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  46. final def loopForever: IO[E, Nothing]

    Returns a new Task that repeatedly executes the source as long as it continues to succeed.

    Returns a new Task that repeatedly executes the source as long as it continues to succeed. It never produces a terminal value.

    Example:

    import scala.concurrent.duration._
    
    Task.eval(println("Tick!"))
      .delayExecution(1.second)
      .loopForever
  47. final def map[B](f: (A) => B): IO[E, B]

    Returns a new Task that applies the mapping function to the element emitted by the source.

    Returns a new Task that applies the mapping function to the element emitted by the source.

    Can be used for specifying a (lazy) transformation to the result of the source.

    This equivalence with flatMap always holds:

    fa.map(f) <-> fa.flatMap(x => Task.pure(f(x)))

  48. final def mapError[E1](f: (E) => E1): IO[E1, A]

    Creates a new task that will will transform errors using supplied function f.

    Creates a new task that will will transform errors using supplied function f.

    Example:

    import java.time.Instant
    
    case class ErrorA(i: Int)
    case class ErrorB(errA: ErrorA, createdAt: Instant)
    
    val task1: IO[ErrorA, String] = IO.raiseError(ErrorA(10))
    val task2: IO[ErrorB, String] = task1.mapError(errA => ErrorB(errA, Instant.now()))
  49. final def mapErrorPartial[E1](pf: PartialFunction[E, E1])(implicit E: <:<[E, Throwable]): IO[E1, A]

    Use this method to lift a subset of errors into domain-specific errors.

    Use this method to lift a subset of errors into domain-specific errors. Not matched errors will be considered non-recoverable and will terminate IO.

    Example:

    sealed trait DomainError
    case class ErrorA(message: String) extends DomainError
    
    val io: IO[DomainError, Int] = IO("1".toInt).mapErrorPartial {
      case ex: NumberFormatException => ErrorA(s"Invalid input: \${ex.getMessage}")
    }

    See mapError for the version that takes a total function.

  50. final def mapErrorPartialWith[E1, B >: A](pf: PartialFunction[E, IO[E1, B]])(implicit E: <:<[E, Throwable]): IO[E1, B]

    Use this method to lift a subset of errors into domain-specific errors.

    Use this method to lift a subset of errors into domain-specific errors. Not matched errors will be considered non-recoverable and will terminate IO.

    Example:

    sealed trait DomainError
    case class ErrorA(message: String) extends DomainError
    
    val io: IO[DomainError, Int] = IO("1".toInt).mapErrorPartial {
      case ex: NumberFormatException => ErrorA(s"Invalid input: \${ex.getMessage}")
    }
  51. final def materialize(implicit ev: <:<[E, Throwable]): UIO[Try[A]]

    Creates a new Task that will expose any triggered error from the source.

  52. final def memoize: IO[E, A]

    Memoizes (caches) the result of the source task and reuses it on subsequent invocations of runAsync.

    Memoizes (caches) the result of the source task and reuses it on subsequent invocations of runAsync.

    The resulting task will be idempotent, meaning that evaluating the resulting task multiple times will have the same effect as evaluating it once.

    Cancellation — a memoized task will mirror the behavior of the source on cancellation. This means that:

    • if the source isn't cancellable, then the resulting memoized task won't be cancellable either
    • if the source is cancellable, then the memoized task can be cancelled, which can take unprepared users by surprise

    Depending on use-case, there are two ways to ensure no surprises:

    • usage of onCancelRaiseError, before applying memoization, to ensure that on cancellation an error is triggered and then noticed by the memoization logic
    • usage of uncancelable, either before or after applying memoization, to ensure that the memoized task cannot be cancelled

    Example:

    import scala.concurrent.CancellationException
    import scala.concurrent.duration._
    
    val source = Task(1).delayExecution(5.seconds)
    
    // Option 1: trigger error on cancellation
    val err = new CancellationException
    val cached1 = source.onCancelRaiseError(err).memoize
    
    // Option 2: make it uninterruptible
    val cached2 = source.uncancelable.memoize

    When using onCancelRaiseError like in the example above, the behavior of memoize is to cache the error. If you want the ability to retry errors until a successful value happens, see memoizeOnSuccess.

    UNSAFE — this operation allocates a shared, mutable reference, which can break in certain cases referential transparency, even if this operation guarantees idempotency (i.e. referential transparency implies idempotency, but idempotency does not imply referential transparency).

    The allocation of a mutable reference is known to be a side effect, thus breaking referential transparency, even if calling this method does not trigger the evaluation of side effects suspended by the source.

    Use with care. Sometimes it's easier to just keep a shared, memoized reference to some connection, but keep in mind it might be better to pass such a reference around as a parameter.

    returns

    a Task that can be used to wait for the memoized value

    Annotations
    @UnsafeBecauseImpure()
    See also

    memoizeOnSuccess for a version that only caches successful results

  53. final def memoizeOnSuccess: IO[E, A]

    Memoizes (cache) the successful result of the source task and reuses it on subsequent invocations of runAsync.

    Memoizes (cache) the successful result of the source task and reuses it on subsequent invocations of runAsync. Thrown exceptions are not cached.

    The resulting task will be idempotent, but only if the result is successful.

    Cancellation — a memoized task will mirror the behavior of the source on cancellation. This means that:

    • if the source isn't cancellable, then the resulting memoized task won't be cancellable either
    • if the source is cancellable, then the memoized task can be cancelled, which can take unprepared users by surprise

    Depending on use-case, there are two ways to ensure no surprises:

    • usage of onCancelRaiseError, before applying memoization, to ensure that on cancellation an error is triggered and then noticed by the memoization logic
    • usage of uncancelable, either before or after applying memoization, to ensure that the memoized task cannot be cancelled

    Example:

    import scala.concurrent.CancellationException
    import scala.concurrent.duration._
    
    val source = Task(1).delayExecution(5.seconds)
    
    // Option 1: trigger error on cancellation
    val err = new CancellationException
    val cached1 = source.onCancelRaiseError(err).memoizeOnSuccess
    
    // Option 2: make it uninterruptible
    val cached2 = source.uncancelable.memoizeOnSuccess

    When using onCancelRaiseError like in the example above, the behavior of memoizeOnSuccess is to retry the source on subsequent invocations. Use memoize if that's not the desired behavior.

    UNSAFE — this operation allocates a shared, mutable reference, which can break in certain cases referential transparency, even if this operation guarantees idempotency (i.e. referential transparency implies idempotency, but idempotency does not imply referential transparency).

    The allocation of a mutable reference is known to be a side effect, thus breaking referential transparency, even if calling this method does not trigger the evaluation of side effects suspended by the source.

    Use with care. Sometimes it's easier to just keep a shared, memoized reference to some connection, but keep in mind it might be better to pass such a reference around as a parameter.

    returns

    a Task that can be used to wait for the memoized value

    Annotations
    @UnsafeBecauseImpure()
    See also

    memoize for a version that caches both successful results and failures

  54. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  55. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  56. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  57. final def onCancelRaiseError[E1 >: E](e: E1): IO[E1, A]

    Returns a new task that mirrors the source task for normal termination, but that triggers the given error on cancellation.

    Returns a new task that mirrors the source task for normal termination, but that triggers the given error on cancellation.

    Normally tasks that are cancelled become non-terminating. Here's an example of a cancelable task:

    import monix.execution.Scheduler.Implicits.global
    import scala.concurrent.duration._
    import java.util.concurrent.TimeoutException
    
    val tenSecs = IO.sleep(10.seconds)
    val task1 = tenSecs.start.flatMap { fa =>
      // Triggering pure cancellation, then trying to get its result
      fa.cancel.flatMap(_ => tenSecs)
    }
    
    task1.timeoutWith(10.seconds, new TimeoutException())
    //=> TimeoutException

    In general you can expect cancelable tasks to become non-terminating on cancellation.

    This onCancelRaiseError operator transforms a task that would yield IO.never on cancellation into one that yields IO.raiseError.

    Example:

    import java.util.concurrent.CancellationException
    
    val anotherTenSecs = IO.sleep(10.seconds)
      .onCancelRaiseError(new CancellationException)
    
    val task2 = anotherTenSecs.start.flatMap { fa =>
      // Triggering pure cancellation, then trying to get its result
      fa.cancel.flatMap(_ => anotherTenSecs)
    }
    
    task2
    // => CancellationException
  58. final def onErrorFallbackTo[E1, B >: A](that: IO[E1, B]): IO[E1, B]

    Creates a new task that in case of error will fallback to the given backup task.

  59. final def onErrorHandle[U >: A](f: (E) => U): UIO[U]

    Creates a new task that will handle any matching throwable that this task might emit.

    Creates a new task that will handle any matching throwable that this task might emit.

    See onErrorRecover for the version that takes a partial function.

  60. final def onErrorHandleWith[E1, B >: A](f: (E) => IO[E1, B]): IO[E1, B]

    Creates a new task that will handle any matching throwable that this task might emit by executing another task.

    Creates a new task that will handle any matching throwable that this task might emit by executing another task.

    See onErrorRecoverWith for the version that takes a partial function.

  61. final def onErrorRecover[E1 >: E, U >: A](pf: PartialFunction[E, U]): IO[E1, U]

    Creates a new task that on error will try to map the error to another value using the provided partial function.

    Creates a new task that on error will try to map the error to another value using the provided partial function.

    See onErrorHandle for the version that takes a total function.

  62. final def onErrorRecoverWith[E1 >: E, B >: A](pf: PartialFunction[E, IO[E1, B]]): IO[E1, B]

    Creates a new task that will try recovering from an error by matching it with another task using the given partial function.

    Creates a new task that will try recovering from an error by matching it with another task using the given partial function.

    See onErrorHandleWith for the version that takes a total function.

  63. final def onErrorRestart(maxRetries: Long): IO[E, A]

    Creates a new task that in case of error will retry executing the source again and again, until it succeeds.

    Creates a new task that in case of error will retry executing the source again and again, until it succeeds.

    In case of continuous failure the total number of executions will be maxRetries + 1.

  64. final def onErrorRestartIf(p: (E) => Boolean): IO[E, A]

    Creates a new task that in case of error will retry executing the source again and again, until it succeeds, or until the given predicate returns false.

    Creates a new task that in case of error will retry executing the source again and again, until it succeeds, or until the given predicate returns false.

    In this sample we retry for as long as the error is a TimeoutException:

    import scala.concurrent.TimeoutException
    
    Task("some long call that may timeout").onErrorRestartIf {
      case _: TimeoutException => true
      case _ => false
    }
    p

    is the predicate that is executed if an error is thrown and that keeps restarting the source for as long as it returns true

  65. final def onErrorRestartLoop[S, E1 >: E, B >: A](initial: S)(f: (E1, S, (S) => IO[E1, B]) => IO[E1, B]): IO[E1, B]

    On error restarts the source with a customizable restart loop.

    On error restarts the source with a customizable restart loop.

    This operation keeps an internal state, with a start value, an internal state that gets evolved and based on which the next step gets decided, e.g. should it restart, maybe with a delay, or should it give up and re-throw the current error.

    Example that implements a simple retry policy that retries for a maximum of 10 times before giving up; also introduce a 1 second delay before each retry is executed:

    import scala.util.Random
    import scala.concurrent.duration._
    
    val task = Task {
      if (Random.nextInt(20) > 10)
        throw new RuntimeException("boo")
      else 78
    }
    
    task.onErrorRestartLoop(10) { (err, maxRetries, retry) =>
      if (maxRetries > 0)
        // Next retry please; but do a 1 second delay
        retry(maxRetries - 1).delayExecution(1.second)
      else
        // No retries left, rethrow the error
        Task.raiseError(err)
    }

    A more complex exponential back-off sample:

    import scala.concurrent.duration._
    
    // Keeps the current state, indicating the restart delay and the
    // maximum number of retries left
    final case class Backoff(maxRetries: Int, delay: FiniteDuration)
    
    // Restarts for a maximum of 10 times, with an initial delay of 1 second,
    // a delay that keeps being multiplied by 2
    task.onErrorRestartLoop(Backoff(10, 1.second)) { (err, state, retry) =>
      val Backoff(maxRetries, delay) = state
      if (maxRetries > 0)
        retry(Backoff(maxRetries - 1, delay * 2)).delayExecution(delay)
      else
        // No retries left, rethrow the error
        Task.raiseError(err)
    }

    The given function injects the following parameters:

    1. error reference that was thrown 2. the current state, based on which a decision for the retry is made 3. retry: S => IO[E, B] function that schedules the next retry
    initial

    is the initial state used to determine the next on error retry cycle

    f

    is a function that injects the current error, state, a function that can signal a retry is to be made and returns the next task

  66. def redeem[B](recover: (E) => B, map: (A) => B): UIO[B]

    Returns a new value that transforms the result of the source, given the recover or map functions, which get executed depending on whether the result is successful or if it ends in error.

    Returns a new value that transforms the result of the source, given the recover or map functions, which get executed depending on whether the result is successful or if it ends in error.

    This is an optimization on usage of attempt and map, this equivalence being true:

    task.redeem(recover, map) <-> task.attempt.map(_.fold(recover, map))

    Usage of redeem subsumes onErrorHandle because:

    task.redeem(fe, id) <-> task.onErrorHandle(fe)

    recover

    is a function used for error recover in case the source ends in error

    map

    is a function used for mapping the result of the source in case it ends in success

  67. final def redeemCause[B](recover: (Cause[E]) => B, map: (A) => B): UIO[B]

    Returns a new value that transforms the result of the source, given the recover or map functions, which get executed depending on whether the result is successful or if it ends in a fatal (untyped) error.

    Returns a new value that transforms the result of the source, given the recover or map functions, which get executed depending on whether the result is successful or if it ends in a fatal (untyped) error.

    recover

    is a function used for error recover in case the source ends in error

    map

    is a function used for mapping the result of the source in case it ends in success

    See also

    IO.redeem for a version which works on typed errors

  68. final def redeemCauseWith[E1, B](recover: (Cause[E]) => IO[E1, B], bind: (A) => IO[E1, B]): IO[E1, B]

    Returns a new value that transforms the result of the source, given the recover or bind functions, which get executed depending on whether the result is successful or if it ends in a fatal (untyped) error.

    Returns a new value that transforms the result of the source, given the recover or bind functions, which get executed depending on whether the result is successful or if it ends in a fatal (untyped) error.

    Usage of redeemWith also subsumes flatMap because:

    task.redeemCauseWith(IO.raiseError, fs) <-> task.flatMap(fs)

    recover

    is the function that gets called to recover the source in case of error

    bind

    is the function that gets to transform the source in case of success

    See also

    IO.redeemWith for a version which only works on typed errors

  69. def redeemWith[E1, B](recover: (E) => IO[E1, B], bind: (A) => IO[E1, B]): IO[E1, B]

    Returns a new value that transforms the result of the source, given the recover or bind functions, which get executed depending on whether the result is successful or if it ends in error.

    Returns a new value that transforms the result of the source, given the recover or bind functions, which get executed depending on whether the result is successful or if it ends in error.

    This is an optimization on usage of attempt and flatMap, this equivalence being available:

    task.redeemWith(recover, bind) <-> task.attempt.flatMap(_.fold(recover, bind))

    Usage of redeemWith subsumes onErrorHandleWith because:

    task.redeemWith(fe, F.pure) <-> task.onErrorHandleWith(fe)

    Usage of redeemWith also subsumes flatMap because:

    task.redeemWith(Task.raiseError, fs) <-> task.flatMap(fs)

    recover

    is the function that gets called to recover the source in case of error

    bind

    is the function that gets to transform the source in case of success

  70. final def restartUntil(p: (A) => Boolean): IO[E, A]

    Given a predicate function, keep retrying the IO until the function returns true.

  71. final def rethrow[E1 >: E, B](implicit ev: <:<[A, Either[E1, B]]): IO[E1, B]

    Inverse of attempt.

    Inverse of attempt. Creates a new IO that absorbs Either.

    IO.now(Right(42)).rethrow <-> IO.now(42)

    IO.now(Left("error")).rethrow <-> IO.raiseError("error")

  72. final def runAsync(cb: (Either[Cause[E], A]) => Unit)(implicit s: Scheduler): Cancelable

    Triggers the asynchronous execution, with a provided callback that's going to be called at some point in the future with the final result.

    Triggers the asynchronous execution, with a provided callback that's going to be called at some point in the future with the final result.

    Note that without invoking runAsync on a IO, nothing gets evaluated, as a IO has lazy behavior.

    import scala.concurrent.duration._
    import monix.bio.Cause
    // A Scheduler is needed for executing tasks via `runAsync`
    import monix.execution.Scheduler.Implicits.global
    
    // Nothing executes yet
    val task: IO[String, String] =
      for {
        _ <- IO.sleep(3.seconds)
        r <- IO.evalTotal { println("Executing..."); "Hello!" }
      } yield r
    
    // Triggering the task's execution:
    val f = task.runAsync {
      case Right(str: String) =>
        println(s"Received: $$str")
      case Left(Cause.Termination(e)) =>
        global.reportFailure(e)
      case Left(Cause.Error(str: String)) =>
        println(s"Received expected error: $$str")
    }
    
    // Or in case we change our mind
    f.cancel()

    BiCallback

    When executing the task via this method, the user is required to supply a side effecting callback with the signature: Either[Cause[E], A] => Unit.

    This will be used by the implementation to signal completion, signaling either a Right(value) or a Left(error).

    IO however uses BiCallback internally, so you can supply a BiCallback instance instead and it will be used to avoid unnecessary boxing. It also has handy utilities.

    Note that with Callback you can:

    Example, equivalent to the above:

    import monix.bio.BiCallback
    
    task.runAsync(new BiCallback[String, String] {
      def onSuccess(str: String) =
        println(s"Received: $$str")
      def onError(e: String) =
        println(s"Received expected error: $$e")
      def onTermination(e: Throwable) =
        global.reportFailure(e)
    })

    Example equivalent with runAsyncAndForget:

    task.runAsync(BiCallback.empty)

    Completing a scala.concurrent.Promise:

    import scala.concurrent.Promise
    
    val p = Promise[Either[String, String]]()
    task.runAsync(BiCallback.fromPromise(p))

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    cb

    is a callback that will be invoked upon completion, either with a successful result, or with an error; note that you can use monix.bio.BiCallback for extra performance (avoids the boxing in scala.Either)

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    returns

    a Cancelable that can be used to cancel a running task

    Annotations
    @UnsafeBecauseImpure()
  73. final def runAsyncAndForget(implicit s: Scheduler): Unit

    Triggers the asynchronous execution of the source task in a "fire and forget" fashion.

    Triggers the asynchronous execution of the source task in a "fire and forget" fashion.

    Starts the execution of the task, but discards any result generated asynchronously and doesn't return any cancelable tokens either. This affords some optimizations — for example the underlying run-loop doesn't need to worry about cancelation. Also the call-site is more clear in intent.

    Example:

    import monix.execution.Scheduler.Implicits.global
    
    val task = Task(println("Hello!"))
    
    // We don't care about the result, we don't care about the
    // cancellation token, we just want this thing to run:
    task.runAsyncAndForget

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    Annotations
    @UnsafeBecauseImpure()
  74. def runAsyncAndForgetOpt(implicit s: Scheduler, opts: Options): Unit

    Triggers the asynchronous execution in a "fire and forget" fashion, like normal runAsyncAndForget, but includes the ability to specify IO.Options that can modify the behavior of the run-loop.

    Triggers the asynchronous execution in a "fire and forget" fashion, like normal runAsyncAndForget, but includes the ability to specify IO.Options that can modify the behavior of the run-loop.

    This allows you to specify options such as:

    • enabling support for IOLocal
    • disabling auto-cancelable run-loops

    See the description of runAsyncOpt for an example of customizing the default IO.Options.

    See the description of runAsyncAndForget for an example of running as a "fire and forget".

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    opts

    a set of Options that determine the behavior of Task's run-loop.

    Annotations
    @UnsafeBecauseImpure()
  75. final def runAsyncF[E1 >: E](cb: (Either[Cause[E1], A]) => Unit)(implicit s: Scheduler): CancelToken[UIO]

    Triggers the asynchronous execution, returning a Task[Unit] (aliased to CancelToken[Task] in Cats-Effect) which can cancel the running computation.

    Triggers the asynchronous execution, returning a Task[Unit] (aliased to CancelToken[Task] in Cats-Effect) which can cancel the running computation.

    This is the more potent version of runAsync, because the returned cancelation token is a Task[Unit] that can be used to back-pressure on the result of the cancellation token, in case the finalizers are specified as asynchronous actions that are expensive to complete.

    Example:

    import scala.concurrent.duration._
    
    val task = Task("Hello!").bracketCase { str =>
      Task(println(str))
    } { (_, exitCode) =>
      // Finalization
      UIO(println(s"Finished via exit code: $$exitCode"))
        .delayExecution(3.seconds)
    }

    In this example we have a task with a registered finalizer (via bracketCase) that takes 3 whole seconds to finish. Via normal runAsync the returned cancelation token has no capability to wait for its completion.

    import monix.execution.Callback
    import monix.execution.Scheduler.Implicits.global
    
    val cancel = task.runAsyncF(Callback.empty)
    
    // Triggering `cancel` and we can wait for its completion
    for (_ <- cancel.runToFuture) {
      // Takes 3 seconds to print
      println("Resources were released!")
    }

    WARN: back-pressuring on the completion of finalizers is not always a good idea. Avoid it if you can.

    BiCallback

    When executing the task via this method, the user is required to supply a side effecting callback with the signature: Either[Cause[E], A] => Unit.

    This will be used by the implementation to signal completion, signaling either a Right(value) or a Left(error).

    IO however uses BiCallback internally, so you can supply a BiCallback instance instead and it will be used to avoid unnecessary boxing. It also has handy utilities.

    Note that with Callback you can:

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    NOTE: the F suffix comes from F[_], highlighting our usage of CancelToken[F] to return a Task[Unit], instead of a plain and side effectful Cancelable object.

    cb

    is a callback that will be invoked upon completion, either with a successful result, or with an error; note that you can use monix.bio.BiCallback for extra performance (avoids the boxing in scala.Either)

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    returns

    a Task[Unit], aliased via Cats-Effect as a CancelToken[Task], that can be used to cancel the running task. Given that this is a Task, it can describe asynchronous finalizers (if the source had any), therefore users can apply back-pressure on the completion of such finalizers.

    Annotations
    @UnsafeBecauseImpure()
  76. def runAsyncOpt(cb: (Either[Cause[E], A]) => Unit)(implicit s: Scheduler, opts: Options): Cancelable

    Triggers the asynchronous execution, much like normal runAsync, but includes the ability to specify IO.Options that can modify the behavior of the run-loop.

    Triggers the asynchronous execution, much like normal runAsync, but includes the ability to specify IO.Options that can modify the behavior of the run-loop.

    This allows you to specify options such as:

    • enabling support for IOLocal
    • disabling auto-cancelable run-loops

    Example:

    import monix.bio.Cause
    import monix.execution.Scheduler.Implicits.global
    
    val task =
      for {
        local <- IOLocal(0)
        _     <- local.write(100)
        _     <- IO.shift
        value <- local.read
      } yield value
    
    // We need to activate support of IOLocal via:
    implicit val opts = IO.defaultOptions.enableLocalContextPropagation
    
    // Actual execution that depends on these custom options:
    task.runAsyncOpt {
      case Right(value) =>
        println(s"Received: $$value")
      case Left(Cause.Termination(e)) =>
        global.reportFailure(e)
      case Left(Cause.Error(str)) =>
        println(s"Received typed error: $$str")
    }

    See IO.Options.

    BiCallback

    When executing the task via this method, the user is required to supply a side effecting callback with the signature: Either[Cause[E], A] => Unit.

    This will be used by the implementation to signal completion, signaling either a Right(value) or a Left(error).

    IO however uses BiCallback internally, so you can supply a BiCallback instance instead and it will be used to avoid unnecessary boxing. It also has handy utilities.

    Note that with Callback you can:

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    cb

    is a callback that will be invoked upon completion, either with a successful result, or with an error; note that you can use monix.bio.BiCallback for extra performance (avoids the boxing in scala.Either)

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    opts

    a set of Options that determine the behavior of Task's run-loop.

    returns

    a Cancelable that can be used to cancel a running task

    Annotations
    @UnsafeBecauseImpure()
  77. def runAsyncOptF[E1 >: E](cb: (Either[Cause[E], A]) => Unit)(implicit s: Scheduler, opts: Options): CancelToken[UIO]

    Triggers the asynchronous execution, much like normal runAsyncF, but includes the ability to specify IO.Options that can modify the behavior of the run-loop.

    Triggers the asynchronous execution, much like normal runAsyncF, but includes the ability to specify IO.Options that can modify the behavior of the run-loop.

    This allows you to specify options such as:

    • enabling support for IOLocal
    • disabling auto-cancelable run-loops

    See the description of runToFutureOpt for an example.

    The returned cancelation token is a Task[Unit] that can be used to back-pressure on the result of the cancellation token, in case the finalizers are specified as asynchronous actions that are expensive to complete.

    See the description of runAsyncF for an example.

    WARN: back-pressuring on the completion of finalizers is not always a good idea. Avoid it if you can.

    BiCallback

    When executing the task via this method, the user is required to supply a side effecting callback with the signature: Either[Cause[E], A] => Unit.

    This will be used by the implementation to signal completion, signaling either a Right(value) or a Left(error).

    IO however uses BiCallback internally, so you can supply a BiCallback instance instead and it will be used to avoid unnecessary boxing. It also has handy utilities.

    Note that with Callback you can:

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    NOTE: the F suffix comes from F[_], highlighting our usage of CancelToken[F] to return a Task[Unit], instead of a plain and side effectful Cancelable object.

    cb

    is a callback that will be invoked upon completion, either with a successful result, or with an error; note that you can use monix.bio.BiCallback for extra performance (avoids the boxing in scala.Either)

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    opts

    a set of Options that determine the behavior of Task's run-loop.

    returns

    a Task[Unit], aliased via Cats-Effect as a CancelToken[Task], that can be used to cancel the running task. Given that this is a Task, it can describe asynchronous finalizers (if the source had any), therefore users can apply back-pressure on the completion of such finalizers.

    Annotations
    @UnsafeBecauseImpure()
  78. final def runAsyncUncancelable(cb: (Either[Cause[E], A]) => Unit)(implicit s: Scheduler): Unit

    Triggers the asynchronous execution of the source task, but runs it in uncancelable mode.

    Triggers the asynchronous execution of the source task, but runs it in uncancelable mode.

    This is an optimization over plain runAsync or runAsyncF that doesn't give you a cancellation token for cancelling the task. The runtime can thus not worry about keeping state related to cancellation when evaluating it.

    import scala.concurrent.duration._
    import monix.bio.Cause
    import monix.execution.Scheduler.Implicits.global
    
    val task: IO[String, String] =
      for {
        _ <- IO.sleep(3.seconds)
        r <- UIO { println("Executing..."); "Hello!" }
      } yield r
    
    // Triggering the task's execution, without receiving any
    // cancelation tokens
    task.runAsyncUncancelable {
      case Right(str) =>
        println(s"Received: $$str")
      case Left(Cause.Termination(e)) =>
        global.reportFailure(e)
      case Left(Cause.Error(str)) =>
        println(s"Received typed error: $$str")
    }

    BiCallback

    When executing the task via this method, the user is required to supply a side effecting callback with the signature: Either[Cause[E], A] => Unit.

    This will be used by the implementation to signal completion, signaling either a Right(value) or a Left(error).

    IO however uses BiCallback internally, so you can supply a BiCallback instance instead and it will be used to avoid unnecessary boxing. It also has handy utilities.

    Note that with Callback you can:

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    Annotations
    @UnsafeBecauseImpure()
  79. def runAsyncUncancelableOpt(cb: (Either[Cause[E], A]) => Unit)(implicit s: Scheduler, opts: Options): Unit

    Triggers the asynchronous execution in uncancelable mode, like runAsyncUncancelable, but includes the ability to specify IO.Options that can modify the behavior of the run-loop.

    Triggers the asynchronous execution in uncancelable mode, like runAsyncUncancelable, but includes the ability to specify IO.Options that can modify the behavior of the run-loop.

    This allows you to specify options such as:

    • enabling support for IOLocal
    • disabling auto-cancelable run-loops

    See the description of runAsyncOpt for an example of customizing the default IO.Options.

    This is an optimization over plain runAsyncOpt or runAsyncOptF that doesn't give you a cancellation token for cancelling the task. The runtime can thus not worry about keeping state related to cancellation when evaluating it.

    BiCallback

    When executing the task via this method, the user is required to supply a side effecting callback with the signature: Either[Cause[E], A] => Unit.

    This will be used by the implementation to signal completion, signaling either a Right(value) or a Left(error).

    IO however uses BiCallback internally, so you can supply a BiCallback instance instead and it will be used to avoid unnecessary boxing. It also has handy utilities.

    Note that with Callback you can:

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    opts

    a set of Options that determine the behavior of Task's run-loop.

    Annotations
    @UnsafeBecauseImpure()
  80. final def runSyncStep(implicit s: Scheduler): Either[IO[E, A], A]

    Executes the source until completion, or until the first async boundary, whichever comes first.

    Executes the source until completion, or until the first async boundary, whichever comes first.

    This operation is mean to be compliant with cats.effect.Effect.runSyncStep, but without suspending the evaluation in IO.

    WARNING: This method is a partial function, throwing exceptions in case errors happen immediately (synchronously).

    Usage sample:

    import monix.execution.Scheduler.Implicits.global
    import scala.util._
    import scala.util.control.NonFatal
    
    try Task(42).runSyncStep match {
      case Right(a) => println("Success: " + a)
      case Left(task) =>
        task.runToFuture.onComplete {
          case Success(a) => println("Async success: " + a)
          case Failure(e) => println("Async error: " + e)
        }
    } catch {
      case NonFatal(e) =>
        println("Error: " + e)
    }

    Obviously the purpose of this method is to be used for optimizations.

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    returns

    Right(result) in case a result was processed, or Left(task) in case an asynchronous boundary was hit and further async execution is needed

    Annotations
    @UnsafeBecauseImpure()
    See also

    runSyncUnsafe, the blocking execution mode that can only work on top of the JVM.

  81. final def runSyncStepOpt(implicit s: Scheduler, opts: Options): Either[IO[E, A], A]

    A variant of runSyncStep that takes an implicit IO.Options from the current scope.

    A variant of runSyncStep that takes an implicit IO.Options from the current scope.

    This helps in tuning the evaluation model of task.

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    opts

    a set of Options that determine the behavior of Task's run-loop.

    returns

    Right(result) in case a result was processed, or Left(task) in case an asynchronous boundary was hit and further async execution is needed

    Annotations
    @UnsafeBecauseImpure()
    See also

    runSyncStep

  82. final def runSyncUnsafe(timeout: Duration = Duration.Inf)(implicit s: Scheduler, permit: CanBlock, ev: <:<[E, Throwable]): A

    Evaluates the source task synchronously and returns the result immediately or blocks the underlying thread until the result is ready.

    Evaluates the source task synchronously and returns the result immediately or blocks the underlying thread until the result is ready.

    The method requires error type to be Throwable. Note that it will work for Nothing (UIO) as well so if you have a different type then you can use task.attempt.runSyncUnsafe to receive Either[E, A] or any other error handling operator.

    WARNING: blocking operations are unsafe and incredibly error prone on top of the JVM. It's a good practice to not block any threads and use the asynchronous runAsync methods instead.

    In general prefer to use the asynchronous IO.runAsync or IO.runToFuture and to structure your logic around asynchronous actions in a non-blocking way. But in case you're blocking only once, in main, at the "edge of the world" so to speak, then it's OK.

    Sample:

    import monix.execution.Scheduler.Implicits.global
    import scala.concurrent.duration._
    
    Task(42).runSyncUnsafe(3.seconds)

    This is equivalent with:

    import scala.concurrent.Await
    
    Await.result[Int](Task(42).runToFuture, 3.seconds)

    Some implementation details:

    • blocking the underlying thread is done by triggering Scala's BlockingContext (scala.concurrent.blocking), just like Scala's Await.result
    • the timeout is mandatory, just like when using Scala's Await.result, in order to make the caller aware that the operation is dangerous and that setting a timeout is good practice
    • the loop starts in an execution mode that ignores BatchedExecution or AlwaysAsyncExecution, until the first asynchronous boundary. This is because we want to block the underlying thread for the result, in which case preserving fairness by forcing (batched) async boundaries doesn't do us any good, quite the contrary, the underlying thread being stuck until the result is available or until the timeout exception gets triggered.

    Not supported on top of JavaScript engines and trying to use it with Scala.js will trigger a compile time error.

    For optimizations on top of JavaScript you can use runSyncStep instead.

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    timeout

    is a duration that specifies the maximum amount of time that this operation is allowed to block the underlying thread. If the timeout expires before the result is ready, a TimeoutException gets thrown. Note that you're allowed to pass an infinite duration (with Duration.Inf), but unless it's main that you're blocking and unless you're doing it only once, then this is definitely not recommended — provide a finite timeout in order to avoid deadlocks.

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    permit

    is an implicit value that's only available for the JVM and not for JavaScript, its purpose being to stop usage of this operation on top of engines that do not support blocking threads.

    Annotations
    @UnsafeBecauseImpure() @UnsafeBecauseBlocking()
  83. final def runSyncUnsafeOpt(timeout: Duration = Duration.Inf)(implicit s: Scheduler, opts: Options, permit: CanBlock, ev: <:<[E, Throwable]): A

    Variant of runSyncUnsafe that takes a IO.Options implicitly from the scope in order to tune the evaluation model of the task.

    Variant of runSyncUnsafe that takes a IO.Options implicitly from the scope in order to tune the evaluation model of the task.

    This allows you to specify options such as:

    • enabling support for IOLocal
    • disabling auto-cancelable run-loops

    See the description of runAsyncOpt for an example of customizing the default IO.Options.

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    timeout

    is a duration that specifies the maximum amount of time that this operation is allowed to block the underlying thread. If the timeout expires before the result is ready, a TimeoutException gets thrown. Note that you're allowed to pass an infinite duration (with Duration.Inf), but unless it's main that you're blocking and unless you're doing it only once, then this is definitely not recommended — provide a finite timeout in order to avoid deadlocks.

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    opts

    a set of Options that determine the behavior of Task's run-loop.

    permit

    is an implicit value that's only available for the JVM and not for JavaScript, its purpose being to stop usage of this operation on top of engines that do not support blocking threads.

    Annotations
    @UnsafeBecauseImpure() @UnsafeBecauseBlocking()
    See also

    runSyncUnsafe

  84. final def runToFuture(implicit s: Scheduler, ev: <:<[E, Throwable]): CancelableFuture[A]

    Triggers the asynchronous execution, returning a cancelable CancelableFuture that can be awaited for the final result or canceled.

    Triggers the asynchronous execution, returning a cancelable CancelableFuture that can be awaited for the final result or canceled.

    Note that without invoking runAsync on a Task, nothing gets evaluated, as a Task has lazy behavior.

    import scala.concurrent.duration._
    // A Scheduler is needed for executing tasks via `runAsync`
    import monix.execution.Scheduler.Implicits.global
    
    // Nothing executes yet
    val task: Task[String] =
      for {
        _ <- Task.sleep(3.seconds)
        r <- Task { println("Executing..."); "Hello!" }
      } yield r
    
    // Triggering the task's execution:
    val f = task.runToFuture
    
    // Or in case we change our mind
    f.cancel()

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    BAD CODE:

    import monix.execution.CancelableFuture
    import scala.concurrent.Await
    
    // ANTI-PATTERN 1: Unnecessary side effects
    def increment1(sample: UIO[Int]): CancelableFuture[Int] = {
      // No reason to trigger `runAsync` for this operation
      sample.runToFuture.map(_ + 1)
    }
    
    // ANTI-PATTERN 2: blocking threads makes it worse than (1)
    def increment2(sample: UIO[Int]): Int = {
      // Blocking threads is totally unnecessary
      val x = Await.result(sample.runToFuture, 5.seconds)
      x + 1
    }
    
    // ANTI-PATTERN 3: this is even WORSE than (2)!
    def increment3(sample: Task[Int]): Task[Int] = {
      // Triggering side-effects, but misleading users/readers
      // into thinking this function is pure via the return type
      IO.fromFuture(sample.runToFuture.map(_ + 1))
    }

    Instead prefer the pure versions. IO has its own map, flatMap, onErrorHandleWith or bracketCase, which are really powerful and can allow you to operate on a task in however way you like without escaping IO's context and triggering unwanted side-effects.

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    returns

    a CancelableFuture that can be used to extract the result or to cancel a running task.

    Annotations
    @UnsafeBecauseImpure()
  85. def runToFutureOpt(implicit s: Scheduler, opts: Options, ev: <:<[E, Throwable]): CancelableFuture[A]

    Triggers the asynchronous execution, much like normal runToFuture, but includes the ability to specify Options that can modify the behavior of the run-loop.

    Triggers the asynchronous execution, much like normal runToFuture, but includes the ability to specify Options that can modify the behavior of the run-loop.

    This is the configurable version of runToFuture. It allows you to specify options such as:

    • enabling support for IOLocal
    • disabling auto-cancelable run-loops

    See IO.Options. Example:

    import monix.execution.Scheduler.Implicits.global
    
    val task =
      for {
        local <- IOLocal(0)
        _     <- local.write(100)
        _     <- IO.shift
        value <- local.read
      } yield value
    
    // We need to activate support of IOLocal via:
    implicit val opts = IO.defaultOptions.enableLocalContextPropagation
    // Actual execution that depends on these custom options:
    // task.runToFutureOpt

    UNSAFE (referential transparency) — this operation can trigger the execution of side effects, which breaks referential transparency and is thus not a pure function.

    Normally these functions shouldn't be called until "the end of the world", which is to say at the end of the program (for a console app), or at the end of a web request (in case you're working with a web framework or toolkit that doesn't provide good integration with Monix's Task via Cats-Effect).

    Otherwise for modifying or operating on tasks, prefer its pure functions like map and flatMap. In FP code don't use runAsync. Remember that Task is not a 1:1 replacement for Future, Task being a very different abstraction.

    PLEASE READ the advice on anti-patterns at runToFuture.

    s

    is an injected Scheduler that gets used whenever asynchronous boundaries are needed when evaluating the task; a Scheduler is in general needed when the Task needs to be evaluated via runAsync

    opts

    a set of Options that determine the behavior of Task's run-loop.

    returns

    a CancelableFuture that can be used to extract the result or to cancel a running task.

    Annotations
    @UnsafeBecauseImpure()
  86. final def start: UIO[Fiber[E, A]]

    Start execution of the source suspended in the Task context.

    Start execution of the source suspended in the Task context.

    This can be used for non-deterministic / concurrent execution. The following code is more or less equivalent with IO.parMap2 (minus the behavior on error handling and cancellation):

    def par2[A, B](ta: Task[A], tb: Task[B]): Task[(A, B)] =
      for {
        fa <- ta.start
        fb <- tb.start
         a <- fa.join
         b <- fb.join
      } yield (a, b)

    Note in such a case usage of doctodo Task.parMap2 parMap2 (and doctodo Task.parMap3 parMap3, etc.) is still recommended because of behavior on error and cancellation — consider that in the example above, if the first task finishes in error, the second task doesn't get cancelled.

    This operation forces an asynchronous boundary before execution

  87. final def startAndForget: UIO[Unit]

    Start asynchronous execution of the source suspended in the IO context, running it in the background and discarding the result.

    Start asynchronous execution of the source suspended in the IO context, running it in the background and discarding the result.

    Similar to start after mapping result to Unit. Below law holds:

    bio.startAndForget <-> bio.start.map(_ => ())

  88. final def synchronized[T0](arg0: => T0): T0
    Definition Classes
    AnyRef
  89. final def tapError[E1 >: E, B](f: (E) => IO[E1, B]): IO[E1, A]

    Creates a new task that will run a provided effect in case of a typed error and raise the original error in case the provided function is successful.

    Creates a new task that will run a provided effect in case of a typed error and raise the original error in case the provided function is successful.

    Example:

    import monix.bio.IO
    
    // will result in Left("Error") and print the error to console
    IO.raiseError("Error1").tapError(err => IO.evalTotal(println(err)))

    If provided function returns an error then the resulting task will raise that error instead.

    Example:

    // will result in Left("Error2")
    IO.raiseError("Error1").tapError(err => IO.raiseError("Error2"))
  90. final def tapEval[E1 >: E, B](f: (A) => IO[E1, B]): IO[E1, A]

    Creates a new IO that will run a provided effect on the success and return the original value.

  91. final def timed: IO[E, (FiniteDuration, A)]

    Measures execution time of the source task and returns both its duration and the computed value.

    Measures execution time of the source task and returns both its duration and the computed value.

    Example:

    for {
      result <- IO(1 + 1).timed
      (duration, value) = result
      _ <- IO(println("Executed in " + duration.toMillis + " ms"))
    } yield value
  92. final def timeout(after: FiniteDuration): IO[E, Option[A]]

    Returns a Task that mirrors the source Task but returns None in case the given duration passes without the task emitting any item.

    Returns a Task that mirrors the source Task but returns None in case the given duration passes without the task emitting any item. Otherwise, returns Some of the resulting value.

  93. final def timeoutL(after: UIO[FiniteDuration]): IO[E, Option[A]]

    Returns a Task that mirrors the source Task but returns None in case the given duration passes without the task emitting any item.

    Returns a Task that mirrors the source Task but returns None in case the given duration passes without the task emitting any item. Otherwise, returns Some of the resulting value.

  94. final def timeoutTo[E1 >: E, B >: A](after: FiniteDuration, backup: IO[E1, B]): IO[E1, B]

    Returns a Task that mirrors the source Task but switches to the given backup Task in case the given duration passes without the source emitting any item.

  95. final def timeoutToL[E1 >: E, B >: A](after: UIO[FiniteDuration], backup: IO[E1, B]): IO[E1, B]

    Returns a Task that mirrors the source Task but switches to the given backup Task in case the given duration passes without the source emitting any item.

    Returns a Task that mirrors the source Task but switches to the given backup Task in case the given duration passes without the source emitting any item.

    Useful when timeout is variable, e.g. when task is running in a loop with deadline semantics.

    Example:

    import monix.execution.Scheduler.Implicits.global
    import scala.concurrent.duration._
    import java.util.concurrent.TimeoutException
    
    val deadline = 10.seconds.fromNow
    
    val singleCallTimeout = 2.seconds
    
    // re-evaluate deadline time on every request
    val actualTimeout = UIO(singleCallTimeout.min(deadline.timeLeft))
    val error = IO.raiseError(new TimeoutException("Task timed-out"))
    
    // expensive remote call
    def call(): Unit = ()
    
    val remoteCall = IO(call())
      .timeoutToL(actualTimeout, error)
      .onErrorRestart(100)
      .timeout(deadline.time)

    Note that this method respects the timeout task evaluation duration, e.g. if it took 3 seconds to evaluate after to a value of 5 seconds, then this task will timeout in exactly 5 seconds from the moment computation started, which means in 2 seconds after the timeout task has been evaluated.

  96. final def timeoutWith[E1 >: E, B >: A](after: FiniteDuration, error: E1): IO[E1, B]

    Returns a Task that mirrors the source Task but that triggers a specified error in case the given duration passes without the task emitting any item.

    Returns a Task that mirrors the source Task but that triggers a specified error in case the given duration passes without the task emitting any item.

    error

    Error raised after given duration passes

  97. final def to[F[_]](implicit F: IOLift[F], ev: <:<[E, Throwable]): F[A]

    Generic conversion of Task to any data type for which there's a IOLift implementation available.

    Generic conversion of Task to any data type for which there's a IOLift implementation available.

    Supported data types:

    This conversion guarantees:

    • referential transparency
    • similar runtime characteristics (e.g. if the source doesn't block threads on evaluation, then the result shouldn't block threads either)
    • interruptibility, if the target data type is cancelable

    Note that this method is only applicable when the typed error E is also a Throwable, or when the source task is an unexceptional one (i.e. it is a UIO). If you need a conversion from E into a Throwable, take a look at mapError or onErrorHandleWith. If you need a conversion into a UIO, take a look at attempt, materialize or onErrorHandle.

    Sample:

    import cats.effect.{IO => CIO}
    import monix.execution.Scheduler.Implicits.global
    import scala.concurrent.duration._
    
    IO.eval(println("Hello!"))
      .delayExecution(5.seconds)
      .to[CIO]
  98. final def toAsync[F[_]](implicit F: Async[F], eff: Effect[Task], ev: <:<[E, Throwable]): F[A]

    Converts the source task into any data type that implements Async.

    Converts the source task into any data type that implements Async.

    Example:

    import cats.effect.{IO => CIO}
    import monix.execution.Scheduler.Implicits.global
    import scala.concurrent.duration._
    
    IO.eval(println("Hello!"))
      .delayExecution(5.seconds)
      .toAsync[CIO]

    An Effect[Task] instance is needed in scope, which itself might need a Scheduler to be available. Such requirement is needed because the Task has to be evaluated in order to be converted.

    Note that this method is only applicable when the typed error E is also a Throwable, or when the source task is an unexceptional one (i.e. it is a UIO). If you need a conversion from E into a Throwable, take a look at mapError or onErrorHandleWith. If you need a conversion into a UIO, take a look at attempt, materialize or onErrorHandle.

    NOTE: the resulting instance will NOT be cancelable, as the Task's cancelation token doesn't get carried over. This is implicit in the usage of cats.effect.Async type class. In the example above what this means is that the task will still print "Hello!" after 5 seconds, even if the resulting task gets cancelled.

    F

    is the cats.effect.Async instance required in order to perform the conversion

    eff

    is the Effect[Task] instance needed to evaluate tasks; when evaluating tasks, this is the pure alternative to demanding a Scheduler

    See also

    to that is able to convert to any data type that has a IOLift implementation

    toConcurrent that is able to convert to cancelable values via the Concurrent type class.

  99. final def toConcurrent[F[_]](implicit F: Concurrent[F], eff: ConcurrentEffect[Task], ev: <:<[E, Throwable]): F[A]

    Converts the source task into any data type that implements Concurrent.

    Converts the source task into any data type that implements Concurrent.

    Example:

    import cats.effect.{IO => CIO}
    import monix.execution.Scheduler.Implicits.global
    import scala.concurrent.duration._
    
    implicit val cs = CIO.contextShift(global)
    
    IO.eval(println("Hello!"))
      .delayExecution(5.seconds)
      .toConcurrent[CIO]

    A ConcurrentEffect[Task] instance is needed in scope, which itself might need a Scheduler to be available. Such a requirement is needed because the Task has to be evaluated in order to be converted.

    Note that this method is only applicable when the typed error E is also a Throwable, or when the source task is an unexceptional one (i.e. it is a UIO). If you need a conversion from E into a Throwable, take a look at mapError or onErrorHandleWith. If you need a conversion into a UIO, take a look at attempt, materialize or onErrorHandle.

    NOTE: the resulting value is cancelable, via usage of cats.effect.Concurrent.

    F

    is the cats.effect.Concurrent instance required in order to perform the conversion

    eff

    is the ConcurrentEffect[Task] instance needed to evaluate tasks; when evaluating tasks, this is the pure alternative to demanding a Scheduler

    See also

    to that is able to convert into any data type that has a IOLift implementation

    toAsync that is able to convert into non-cancelable values via the Async type class.

  100. final def toReactivePublisher(implicit s: Scheduler, ev: <:<[E, Throwable]): Publisher[A]

    Converts the source task into an org.reactivestreams.Publisher that emits a single item on success, or an error when there is a typed or fatal failure.

    Converts the source task into an org.reactivestreams.Publisher that emits a single item on success, or an error when there is a typed or fatal failure.

    Note that this method is only applicable when the typed error E is also a Throwable, or when the source task is an unexceptional one (i.e. it is a UIO). If you need a conversion from E into a Throwable, take a look at mapError or onErrorHandleWith. If you need a conversion into a UIO, take a look at attempt, materialize or onErrorHandle.

    See reactive-streams.org for the Reactive Streams specification.

  101. def toString(): String

    Returns a string representation of this task meant for debugging purposes only.

    Returns a string representation of this task meant for debugging purposes only.

    Definition Classes
    IO → AnyRef → Any
  102. final def uncancelable: IO[E, A]

    Makes the source Task uninterruptible such that a cancel signal (e.g.

    Makes the source Task uninterruptible such that a cancel signal (e.g. Fiber.cancel) has no effect.

    import monix.execution.Scheduler.Implicits.global
    import scala.concurrent.duration._
    
    val uncancelable = Task
      .eval(println("Hello!"))
      .delayExecution(10.seconds)
      .uncancelable
      .runToFuture
    
    // No longer works
    uncancelable.cancel()
    
    // After 10 seconds
    // => Hello!
  103. final def void: IO[E, Unit]

    Returns this task mapped to unit

  104. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  105. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  106. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException]) @native()

Inherited from Serializable

Inherited from AnyRef

Inherited from Any

Ungrouped