Kotlin Coroutines - Structured Concurrency and Virtual Threads

Understanding Kotlin Coroutines

Spread the love

In this article we will be taking you through how coroutines work internally and what benefits do Kotlin coroutines bring to us. Kotlin coroutines address some of the issues that have always existed in the JVM world; providing a safe, readable and scalable solution for our multithreaded applications.

Let’s try to understand first what coroutines are then!

Introduction

Kotlin coroutines are very lightweight threads capable of suspending themselves while they wait for any kind of IO operation, releasing in this way their platform thread so it can do useful work for another coroutine.

This makes processing much more efficient, as a consequence of a much more efficient use of existing resources.

If you have read our article “Understanding Java Virtual Threads: The Death of Async Programming”, all of this will sound familiar to you.

Java virtual threads and Kotlin coroutines are similar in some way, although Kotlin already provides a structured concurrency construct, something that Java still doesn’t. However, there are plans to release something similar (scopes) very soon, there is some work done as part of JEP 428 that you could check if interested.

They both try to solve one of the main limitations in terms of scalability in the JVM ecosystem, due to the parity between platform threads and OS threads.

You could have the feeling that Java has, in some way, copied Kotlin coroutines but we cannot either confirm nor deny this hypothesis. Kotlin coroutines were first released in 2018, whereas Java Virtual Threads were just recently released as a preview feature; however, Project Loom has been ongoing since 2017.

Therefore, it’s difficult to tell who came up with the idea first, specially when we are talking about the JVM ecosystem. We only know that Kotlin released this feature way earlier than Java, the only thing you can do is to make your own conclusions.

Previously we have mentioned in our article “A New Future For Java”, what we call “The Kotlinisation of Java”, a tendency we see in the recent changes in Java that tend to provide features already presented by Kotlin before. This is an effort to make Java more friendly for developers, avoiding unnecessary work and hassles.

In this case, we cannot guarantee what exactly happened. Some of these terms, like “structured concurrency”, were first heard in the 60s so this is not something new in computing.

Let’s see then how Kotlin coroutines work!

Kotlin Coroutines internals

It’s time now to see how Kotlin coroutines work and how do they behave under different conditions. The way that Kotlin coroutines work internally is by using something called continuations. Continuations basically allow us to resume some work at the exact point it was suspended.

The way it works is that when we call a suspendable function, its local variables are created on the stack. When this function reaches a point where it has to wait for a IO operation, it will suspend itself; saving its context, local variables and the point where it paused.

The underlying thread will then carry some other work for a different coroutine. Once the IO operation has been completed, the coroutine will try to resume. If there are no threads available, it will wait until a thread is available, otherwise it will resume immediately and continue from the point it left its work before.

Kotlin Coroutines - Internals
Photo by Jonas Svidras on Unsplash

We will be showing you very simple examples to make you easy to understand how our tasks get processed for each of our examples.

For each of our examples we will use a very simple piece of code simulating a batch of tasks, which is able to suspend itself when needed. In each task we simulate an IO wait of 10ms using delay, once that is completed we just print the task number for that batch and what platform thread and coroutine is it running on. It looks like this:

package com.theboreddev

import kotlinx.coroutines.delay

class Coroutines {
    suspend fun process(batchName: String) {
        for (i in 1..50) {
            delay(10)
            println("Processed task $i for batch $batchName on thread ${Thread.currentThread().name}")
        }
    }
}

As you can see, it’s very simple. Let’s move to the first of the examples then!

Sequential tasks

The fact that we are using Kotlin coroutines library doesn’t mean that our tasks are always multi-threaded and running in parallel. We can launch two tasks using Kotlin coroutines and they will still be running sequentially, for example this test in particular:

    @OptIn(ExperimentalCoroutinesApi::class)
    @Test
    fun `should run sequentially`() = runTest {

        val time = measureTimeMillis {
            val job = GlobalScope.launch {
                coroutines.process("batch a")
                coroutines.process("batch b")
            }

            job.join()
        }

        println("Completed in $time milliseconds")
    }

In this test we run two batches of tasks using GlobalScope.launch. This will create a new coroutine to run our tasks, although both batches will run on the same platform thread and coroutine. We can see that if we check the logs after running this test:

Processed task 1 for batch batch a on thread DefaultDispatcher-worker-1 @coroutine#2
...
Processed task 50 for batch batch a on thread DefaultDispatcher-worker-1 @coroutine#2
Processed task 1 for batch batch b on thread DefaultDispatcher-worker-1 @coroutine#2
...
Processed task 50 for batch batch b on thread DefaultDispatcher-worker-1 @coroutine#2
Completed in 1230 milliseconds

We have omitted most of the logs to avoid unnecessary clutter, as each task runs on the same platform thread and coroutine, as expected. This is because we are just defining one single coroutine, inside of it we run both batches with each of their tasks.

You may be wondering, what’s the benefit when I run sequential tasks then? There is still a huge benefit in Kotlin coroutines when running potentially blocking sequential code.

Every time that a coroutine is paused waiting for an IO operation, the underlying platform thread doesn’t have to wait for its completion. Instead, the platform thread gets assigned to another coroutine to be able to continue being useful during that time. Let’s demonstrate this with a simple test:

@OptIn(ExperimentalCoroutinesApi::class)
    @Test
    fun `should reuse platform threads when paused`() = runTest {

        val jobs = mutableListOf<Job>()
        val time = measureTimeMillis {
            withContext(Dispatchers.IO) {

                jobs += launch {
                    coroutines.process("batch a")
                    coroutines.process("batch b")
                    coroutines.process("batch c")
                    coroutines.process("batch d")
                    coroutines.process("batch e")
                }

                async {
                    coroutines.process("batch aa")
                    coroutines.process("batch bb")
                }

                async {
                    coroutines.process("batch ba")
                    coroutines.process("batch ab")
                }
            }

            jobs.joinAll()
        }

        println("Completed in $time milliseconds")
    }

We are now running multiple batches sequentially and, on top of that, we run two sets of two batches in separate coroutines to be able to demonstrate that when the coroutines in the sequential batches pause, we can reuse the underlying threads.

These three groups of tasks should be run at the same time then, we will see in the results how a platform thread will be able to do work for any of these groups of tasks. Every time that a coroutine yields because it has to wait for an IO operation, the underlying thread could be reused somewhere else. Therefore, next time the coroutine gets resumed, the underlying thread could have changed. Let’s see the results in our logs.

Processed task 1 for batch batch aa on thread DefaultDispatcher-worker-7 @coroutine#3
Processed task 1 for batch batch ba on thread DefaultDispatcher-worker-5 @coroutine#5
Processed task 1 for batch batch a on thread DefaultDispatcher-worker-3 @coroutine#2
Processed task 2 for batch batch aa on thread DefaultDispatcher-worker-3 @coroutine#3
Processed task 2 for batch batch a on thread DefaultDispatcher-worker-5 @coroutine#2
Processed task 2 for batch batch ba on thread DefaultDispatcher-worker-7 @coroutine#5
Processed task 3 for batch batch aa on thread DefaultDispatcher-worker-7 @coroutine#3
Processed task 3 for batch batch a on thread DefaultDispatcher-worker-5 @coroutine#2
Processed task 3 for batch batch ba on thread DefaultDispatcher-worker-3 @coroutine#5
Processed task 4 for batch batch aa on thread DefaultDispatcher-worker-3 @coroutine#3
Processed task 4 for batch batch ba on thread DefaultDispatcher-worker-7 @coroutine#5
Processed task 4 for batch batch a on thread DefaultDispatcher-worker-5 @coroutine#2
Processed task 5 for batch batch aa on thread DefaultDispatcher-worker-5 @coroutine#3
Processed task 5 for batch batch a on thread DefaultDispatcher-worker-6 @coroutine#2
Processed task 5 for batch batch ba on thread DefaultDispatcher-worker-7 @coroutine#5
...
Completed in 3070 milliseconds

You will notice how every batch always run on the same coroutine, however, the underlying thread changes quite frequently. For instance, “batch aa” always runs on coroutine number 3. However, if you check the underlying thread used in the pool, it uses up to 3 different platform thread for the first five executions!

Now that we know a bit more about how scheduling works, what should we do if we want to run them in parallel to save time? Let’s see what we can do!

Parallel tasks

If we want to run them in parallel, this is as simple as creating a coroutine for each of the batches. Our test would look like this then:

    @OptIn(ExperimentalCoroutinesApi::class)
    @Test
    fun `should run in parallel`() = runTest {

        val jobs = mutableListOf<Job>()
        val time = measureTimeMillis {
            withContext(Dispatchers.IO) {
                jobs += launch {
                    coroutines.process("batch y")
                }
                jobs += launch {
                    coroutines.process("batch z")
                }
            }
            jobs.joinAll()
        }

        println("Completed in $time milliseconds")
    }

We are now launching two coroutines in parallel, if we look at the logs we can see how they now run in parallel using different coroutines.

Processed task 1 for batch batch z on thread DefaultDispatcher-worker-3 @coroutine#3
Processed task 1 for batch batch y on thread DefaultDispatcher-worker-2 @coroutine#2
Processed task 2 for batch batch y on thread DefaultDispatcher-worker-3 @coroutine#2
Processed task 2 for batch batch z on thread DefaultDispatcher-worker-2 @coroutine#3
Processed task 3 for batch batch y on thread DefaultDispatcher-worker-2 @coroutine#2
Processed task 3 for batch batch z on thread DefaultDispatcher-worker-3 @coroutine#3
...
Completed in 630 milliseconds

You will have notice that we use launch to create a new coroutine, but we’ve seen before that async can be used for that purpose too. What’s the difference then?

The only difference is that launch doesn’t return a value after completing the inner task, it just returns a Job object. However, async returns a Deferred object, which is also a Job but it contains a value returned after completion of the inner task. You can see how Deferred interface extends from Job:

public interface Deferred<out T> : Job

We are going to modify our previous example to add the result of two parallel tasks, it would look like this:

    @OptIn(ExperimentalCoroutinesApi::class)
    @Test
    fun `should run in parallel and get values`() = runTest {

        val jobs = mutableListOf<Deferred<Int>>()
        var result: Int
        val time = measureTimeMillis {
            withContext(Dispatchers.IO) {
                jobs += async {
                    delay(100)
                    Random.nextInt(0, 100)
                }
                jobs += async {
                    delay(100)
                    Random.nextInt(0, 100)
                }
            }
            result = jobs.awaitAll().sum()
        }

        println("Completed in $time milliseconds with result $result")
    }

First of all, you can see how we are generating random Int numbers in our launched coroutines in this example. You can check our article “How to Generate Random Numbers in Kotlin” to learn different ways to do it.

Coming back to our main topic, you can also see how we now have a list of Deferred objects instead. We can use awaitAll to wait for the completion of all these tasks. We will obtain a List<Int> containing an Int for each task, once we have that, we can just use sum to add the values.

If we had just one task returning a value, instead of using awaitAll we’d have to use await. It’s pretty intuitive, but let’s see a short example using this:

val r = async {
  delay(100)
  Random.nextInt(0, 100) 
}
r.await()

Having said that, if we run our previous test shown above, we get the expected result:

Completed in 112 milliseconds with result 93

One more thing you’ll notice in the example using launch, is that we launch two different coroutines using launch method, instead of GlobalScope.launch. Why is this? Let’s try to understand scopes a bit better!

Coroutine scopes

Every time we launch Kotlin coroutines we need a context to attach our coroutines to. When we use GlobalScope we are using the global scope, which will perdure during the whole application’s lifetime. This means that our coroutine could exist for the entire lifetime of our application.

We are forced to do always run coroutines inside a context to be able to have a safer concurrent code. By attaching a coroutine to a coroutine context, the runtime always knows what to do if a coroutine fails.

In those situations where we have multiple nested coroutines, in a hierarchical structure where each coroutine will have its parent but it might be a parent at the same time, the runtime will know what coroutines should cancel in a failure scenario. This is what we call structured concurrency!

In the image below you can clearly see how structured concurrency would look like in Kotlin, where the creation of a new coroutine always involves the creation of a nested scope inside of the current scope.

Kotlin Coroutines - Scopes
Credit: Author

No more threads hanging in our application, no more being extremely careful about leaking them all over the place! One of the huge benefits of structured concurrency is that we can define hierarchies of threads in our application. If any of our children at any of the levels fails, then cancellation gets triggered for every coroutine in our hierarchy.

Using GlobalScope is not a recommended practice, as it will not cancel our coroutines unless the application gets shutdown! Keep in mind that by default, every coroutine launched inherits the context from its parent. This means that in the cases we launch multiple nested coroutines inside GlobalScope.launch, all of them inherit the global scope from it!

This is very risky, as coroutines will run forever until the application shuts down. Instead, we should create a context using withContext method. This method accepts a context and a suspend function, if we don’t specify any context, it will use the default dispatcher provided in Kotlin.

There is also a specific dispatcher for blocking tasks, which you can use by passing Dispatchers.IO to withContext method.

If your application is IO-bound, most of the times you will have to use Dispatchers.IO, however, we recommend reading about the existing dispatchers to understand which one fits better in your situation.

If you are interested in knowing about Kotlin in more detail, we recommend the following books for your own reading:

Conclusion

In this article we have taken you through a deep introduction about Kotlin coroutines and how they can help us run concurrent code in a safe, efficient, simple and readable manner. We have seen how important are Kotlin coroutine contexts for the safety of our application and also how efficient is the resource utilisation with Kotlin coroutines.

This is all from us today! We hope you’ve enjoyed our journey together through Kotlin coroutines and hopefully learned something new! Looking forward to seeing you again very soon!

Thanks for reading us!

Leave a Reply