DEV Community

Ryosuke Hasebe
Ryosuke Hasebe

Posted on

About Micrometer Context Propagation

Introduction

A library called Micrometer Context Propagation has been introduced in Spring Boot and Reactor (starting from Spring Boot 3).

As the name suggests, this library facilitates Context Propagation between threads.

In this article, I will explain Micrometer Context Propagation. Since this is a niche topic and there are not many articles covering it, I hope this will be helpful to someone.

Why was Micrometer Context Propagation created?

Originally, Micrometer became popular in the Java community as a metrics library. Over time, Micrometer expanded its scope to cover tracing and logging as well, essentially extending its reach to observability.

Generally (not just in Java), tracing requires a technique called Context Propagation. Micrometer Context Propagation is designed to extract and provide this technique in an easy-to-use manner.

It is not exclusive to Micrometer but is also designed to work easily with various libraries such as Kotlin Coroutine Context and Reactor Context.

In Spring Boot, there was a library called Spring Cloud Sleuth (hereafter referred to as Sleuth), which handled tracing in Spring WebMVC/WebFlux.
https://spring.io/projects/spring-cloud-sleuth

With Spring Boot 3, Sleuth was deprecated, and its functionalities were incorporated into Micrometer and Spring Boot.

What is Context Propagation?

As the name suggests, it is a technique for correctly propagating context. At this point, you might wonder, "What is context?" In this case, you can think of it as MDC or TraceContext (a collection of values such as traceId and spanId).

Traditionally, in Java, context propagation could be achieved by simply storing values in a Thread Local. In fact, both MDC and (brave's) TraceContext are stored in Thread Local.

... This might be hard to understand just from an explanation, so let’s look at a concrete example. Let's start with MDC, which is the most familiar case.

fun main() {
    MDC.put("hoge", "hoge-value")

    someFunc1()
    someFunc2()
}

fun someFunc1() {
    println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}

fun someFunc2() {
    println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
Enter fullscreen mode Exit fullscreen mode

Here is the result. There should be no surprises here—the MDC setting remains intact.

Called someFunc1 func. thread=main, mdc={hoge=hoge-value}
Called someFunc2 func. thread=main, mdc={hoge=hoge-value}
Enter fullscreen mode Exit fullscreen mode

Now, what about the following example? Here, someFunc2 is executed on a separate thread.

fun main() {
    MDC.put("hoge", "hoge-value")

    someFunc1()
    thread { someFunc2() }.join()
}

fun someFunc1() {
    println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}

fun someFunc2() {
    println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
Enter fullscreen mode Exit fullscreen mode

Here is the result. In someFunc2, the contents of MDC are null. Since MDC is based on Thread Local, it does not propagate correctly when the thread changes.

Called someFunc1 func. thread=main, mdc={hoge=hoge-value}
Called someFunc2 func. thread=Thread-0, mdc=null
Enter fullscreen mode Exit fullscreen mode

To propagate it correctly, we need to use a technique like the one below (which is what Context Propagation is). Here, we define a custom function called mdcPropagatingThread.

fun main() {
    MDC.put("hoge", "hoge-value")

    someFunc1()
    mdcPropagatingThread { someFunc2() }.join()
}

fun someFunc1() {
    println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}

fun someFunc2() {
    println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}

fun mdcPropagatingThread(block: () -> Unit): Thread {
    val map = MDC.getCopyOfContextMap()
    return thread {
        MDC.setContextMap(map)
        try {
            block()
        } finally {
            MDC.clear() // In this example, it's a disposable thread, so there's no need to clear it...
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Here is the result. The original MDC is successfully carried over to someFunc2.

Called someFunc1 func. thread=main, mdc={hoge=hoge-value}
Called someFunc2 func. thread=Thread-0, mdc={hoge=hoge-value}
Enter fullscreen mode Exit fullscreen mode

Micrometer Context Propagation provides an abstraction of such techniques as an OSS library.

The previous example can be rewritten using Micrometer Context Propagation as follows.
At first glance, you might think that the additional MDCAccessor class makes the code longer and more cumbersome. However, since it becomes reusable in various places, it actually simplifies things overall.

val snapshotFactory = ContextSnapshotFactory.builder().build()

fun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())

    MDC.put("hoge", "hoge-value")

    someFunc1()
    contextPropagatingThread { someFunc2() }.join()
}

fun someFunc1() {
    println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}

fun someFunc2() {
    println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}

fun contextPropagatingThread(block: () -> Unit): Thread {
    val snapshot = snapshotFactory.captureAll()
    return thread {
        snapshot.setThreadLocals().use {
            block()
        }
    }
}  

class MDCAccessor : ThreadLocalAccessor<Map<String, String?>> {
    override fun key(): Any {
        return MDCAccessor::class.java
    }

    override fun getValue(): Map<String, String?>? {
        return MDC.getCopyOfContextMap()
    }

    override fun setValue(value: Map<String, String?>) {
        MDC.setContextMap(value)
    }

    override fun setValue() {
        MDC.clear()
    }
}
Enter fullscreen mode Exit fullscreen mode

How to Use Micrometer Context Propagation

The introduction took too long, but now we move to the main topic—how to use Micrometer Context Propagation.

The main classes (or concepts) to remember are the following four:

  • ThreadLocalAccessor
  • ContextAccessor
  • ContextRegistry
  • ContextSnapshot

ThreadLocalAccessor

When you want to propagate Thread Local values, implement the ThreadLocalAccessor interface.

You need to override the following methods:

Here are some examples.

Example: MDC

(Reiterated example)

class MDCAccessor : ThreadLocalAccessor<Map<String, String?>> {
    override fun key(): Any {
        return MDCAccessor::class.java
    }

    override fun getValue(): Map<String, String?>? {
        return MDC.getCopyOfContextMap()
    }

    override fun setValue(value: Map<String, String?>) {
        MDC.setContextMap(value)
    }

    override fun setValue() {
        MDC.clear()
    }
}
Enter fullscreen mode Exit fullscreen mode

However, since brave may update MDC, the above approach might miss some propagation. In some cases, it's better to use the following approach:

class MDCAccessor : ThreadLocalAccessor<Map<String, String?>> {
    override fun key(): Any {
        return MDCAccessor::class.java
    }

    override fun getValue(): Map<String, String?>? {
        return MDC.getCopyOfContextMap()
    }

    override fun setValue(value: Map<String, String?>) {
        // There are cases where it doesn't work well when used together with Brave's MDC decoration, so use the put method.
        // see: https://github.com/openzipkin/brave/blob/69003dfc811418f0dbc42e9e17ff880ebe1f4b02/brave/src/main/java/brave/propagation/CurrentTraceContext.java#L130
        value.forEach { (k, v) -> MDC.put(k, v) }
    }

    override fun setValue() {
        // NOOP
    }

    override fun restore(previousValue: Map<String, String?>) {
        MDC.setContextMap(previousValue)
    }

    override fun restore() {
        MDC.clear()
    }
}
Enter fullscreen mode Exit fullscreen mode

With the restore method, there's some flexibility.

Example: Brave Trace Context

(Not tested, but probably works like this.)

This is just an example, but if you use micrometer-tracing, you can use the built-in ObservationThreadLocalAccessor to propagate the Trace Context.

class BraveTracingContextAccessor : ThreadLocalAccessor<TraceContext> {
    override fun key(): Any {
        return BraveTracingContextAccessor::class.java
    }

    override fun getValue(): TraceContext? {
        return Tracing.current().currentTraceContext().get()
    }

    override fun setValue(value: TraceContext) {
        Tracing.current().currentTraceContext().maybeScope(value)
    }

    override fun setValue() {
        Tracing.current().currentTraceContext().maybeScope(null)
    }
}
Enter fullscreen mode Exit fullscreen mode

ContextAccessor

When you want to propagate Map-like objects such as Reactor Context, implement the ContextAccessor interface.

However, you probably won't need to implement this yourself often, so you don't need to remember it too much.

Example: Reactor Context

Reactor provides an implementation called ReactorContextAccessor.

/**
 * A {@code ContextAccessor} to enable reading values from a Reactor
 * {@link ContextView} and writing values to {@link Context}.
 * <p>
 * Please note that this public class implements the {@code libs.micrometer.contextPropagation}
 * SPI library, which is an optional dependency.
 *
 * @author Rossen Stoyanchev
 * @author Simon Baslé
 * @since 3.5.0
 */
public final class ReactorContextAccessor implements ContextAccessor<ContextView, Context> {

    @Override
    public Class<? extends ContextView> readableType() {
        return ContextView.class;
    }

    @Override
    public void readValues(ContextView source, Predicate<Object> keyPredicate, Map<Object, Object> target) {
        source.forEach((k, v) -> {
            if (keyPredicate.test(k)) {
                target.put(k, v);
            }
        });
    }

    @Override
    @Nullable
    public <T> T readValue(ContextView sourceContext, Object key) {
        return sourceContext.getOrDefault(key, null);
    }

    @Override
    public Class<? extends Context> writeableType() {
        return Context.class;
    }

    @Override
    public Context writeValues(Map<Object, Object> source, Context target) {
        return target.putAllMap(source);
    }
}
Enter fullscreen mode Exit fullscreen mode

ContextRegistry

ContextRegistry is an instance that holds the previously mentioned ThreadLocalAccessor and ContextAccessor.

You can register it like this. It is expected to be registered before starting the application in the main method.

ContextRegistry.getInstance()
    .registerThreadLocalAccessor(...) // ThreadLocalAccessor
    .registerContextAccessor(...) // ContextAccesor
Enter fullscreen mode Exit fullscreen mode

Although multiple instances of ContextRegistry can be created, it is recommended to share and use the instance obtained via ContextRegistry.getInstance(). At the very least, Reactor uses the instance obtained via ContextRegistry.getInstance().

Additionally, it can be automatically registered via the Service Loader.

ContextRegistry()
    .loadThreadLocalAccessors()
    .loadContextAccessors()
Enter fullscreen mode Exit fullscreen mode

The instance obtained via ContextRegistry.getInstance() is pre-registered through the Service Loader.
https://github.com/micrometer-metrics/context-propagation/blob/75a243f3427d0941e09302c3fc29f5b2a0297583/context-propagation/src/main/java/io/micrometer/context/ContextRegistry.java#L40-L41

For example, the ObservationThreadLocalAccessor for Micrometer Observation is automatically registered via the Service Loader.
https://github.com/micrometer-metrics/micrometer/blob/a56b968ba5b3db9b5e4a4feac813080783a16f5f/micrometer-observation/src/main/resources/META-INF/services/io.micrometer.context.ThreadLocalAccessor

ReactorContextAccessor is also automatically registered via the Service Loader.
https://github.com/reactor/reactor-core/blob/5553aa80137482ec26acb960f4d4f42b8a44da94/reactor-core/src/main/resources/META-INF/services/io.micrometer.context.ContextAccessor

ContextSnapshot

As the name suggests, this is a snapshot of a context. By calling setThreadLocals or updateContext implemented in this class, context can be propagated to various locations (such as different threads or Reactor Context).

A ContextSnapshot can be created via ContextSnapshotFactory.

// Retrieve from the registered ThreadLocalAccessor.
val snapshot = ContextSnapshotFactory.builder().build().captureAll()

// Retrieve from the registered ThreadLocalAccessor and Reactor Context. ReactorContextAccessor must be registered.
val snapshot = ContextSnapshotFactory.builder().build().captureAll(reactorContext)
Enter fullscreen mode Exit fullscreen mode

When creating a ContextSnapshotFactory, you can configure the following settings via the builder (the example above does not set anything):

Creating a ContextSnapshotFactory every time is cumbersome, so it is advisable to store it as a static variable or register it as a bean.

To propagate a ContextSnapshot to another thread, you can do something like this:

fun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())

    MDC.put("hoge", "hoge-value")

    val snapshot = ContextSnapshotFactory.builder().build().captureAll()
    thread {
        snapshot.setThreadLocals().use {
            someFunc1()
        }
    }
}

fun someFunc1() {
    println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
Enter fullscreen mode Exit fullscreen mode

By specifying a keyPredicate when calling setThreadLocals, you can filter which ThreadLocalAccessors will be active.
For example, if you want to activate only MDCAccessor, you can do this:

fun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())

    MDC.put("hoge", "hoge-value")

    val snapshot = ContextSnapshotFactory.builder().build().captureAll()
    thread {
        snapshot.setThreadLocals { key -> key == MDCAccessor::class.java }.use {
            someFunc1()
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Propagating to Reactor Context

To propagate to Reactor Context using ContextSnapshot, you can do the following. ReactorContextAccessor needs to be registered.

fun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
    val snapshotFactory = ContextSnapshotFactory.builder().build()

    MDC.put("hoge", "hoge-value")

    val snapshot = snapshotFactory.captureAll()
    Mono.delay(Duration.ofMillis(100))
        .flatMap { value ->
            Mono.deferContextual {
                // Propagated to the Reactor Context.
                println("thread=${Thread.currentThread().name}, mdcMap=${it.get<Map<String, String?>>(MDCAccessor::class.java)}")

                // It does not propagate to ThreadLocal.
                println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")

                // If you want to propagate it to ThreadLocal, write it like this again.
                snapshotFactory.captureFrom(it).setThreadLocals().use {
                    println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
                }

                Mono.just(value)
            }
        }
        // Propagate to the Reactor Context using the `updateContext` method.
        .contextWrite { snapshot.updateContext(it) }
        .block()
}

// Result
thread=parallel-1, mdcMap={hoge=hoge-value}
thread=parallel-1, mdc=null
thread=parallel-1, mdc={hoge=hoge-value}
Enter fullscreen mode Exit fullscreen mode

However, in Reactor, a convenient method called contextCapture is available, so you can write it like this:

fun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
    val snapshotFactory = ContextSnapshotFactory.builder().build()

    MDC.put("hoge", "hoge-value")

    Mono.delay(Duration.ofMillis(100))
        .flatMap { value ->
            Mono.deferContextual {
                // Propagated to the Reactor Context.
                println("thread=${Thread.currentThread().name}, mdcMap=${it.get<Map<String, String?>>(MDCAccessor::class.java)}")

                // It does not propagate to ThreadLocal.
                println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")

                // If you want to propagate it to ThreadLocal, write it like this again.
                snapshotFactory.captureFrom(it).setThreadLocals().use {
                    println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
                }

                Mono.just(value)
            }
        }
        .contextCapture() // This
        .block()
}
Enter fullscreen mode Exit fullscreen mode

... There's still quite a bit to write, making it inconvenient. Running Hooks.enableAutomaticContextPropagation() will automatically handle the following, making it significantly more convenient:

  • contextCapture() before block() (processing from thread local to Reactor context)
  • snapshotFactory.captureFrom(it).setThreadLocals() written in Mono.deferContextual (processing from Reactor context to thread local)
fun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
    Hooks.enableAutomaticContextPropagation()

    MDC.put("hoge", "hoge-value")

    Mono.delay(Duration.ofMillis(100))
        .flatMap { value ->
            Mono.deferContextual {
                // Propagated to the Reactor Context.
                println("thread=${Thread.currentThread().name}, mdcMap=${it.get<Map<String, String?>>(MDCAccessor::class.java)}")

                // Propagated to both Reactor Context and ThreadLocal.
                println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")

                Mono.just(value)
            }
        }
        .block()
}
Enter fullscreen mode Exit fullscreen mode
(Bonus) Kotlin Coroutines and Hooks.enableAutomaticContextPropagation

As mentioned earlier, using Hooks.enableAutomaticContextPropagation allows Mono.block and similar methods to automatically execute contextCapture(), but it does not work for Coroutine methods like awaitSingle.

ffun main() {
    ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
    Hooks.enableAutomaticContextPropagation()

    val job = GlobalScope.launch {
        MDC.put("hoge", "hoge-value")

        Mono.delay(Duration.ofMillis(100))
            .flatMap { value ->
                Mono.deferContextual {
                    // Not propagated to the Reactor Context.
                    println("thread=${Thread.currentThread().name}, mdcMap=${it.getOrDefault<Map<String, String?>>(MDCAccessor::class.java, null)}")

                    // Propagated to ThreadLocal.
                    println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")

                    Mono.just(value)
                }
            }
            .awaitSingle()
    }
    runBlocking { job.join() }
}

// Result
thread=parallel-1, mdcMap=null
thread=parallel-1, mdc={hoge=hoge-value}
Enter fullscreen mode Exit fullscreen mode

By defining the following class and registering it in the Service Loader, contextCapture will be automatically executed, just like with block. However, I am not entirely confident if this is the best approach yet.
(I have opened an issue about this, so if you're interested, please check it out → https://github.com/reactor/reactor-core/issues/3563)

package observation101.context.coroutines

import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.reactive.ContextInjector
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import kotlin.coroutines.CoroutineContext

@OptIn(InternalCoroutinesApi::class)
class ContextCaptureInjector : ContextInjector {
    override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
        return when (publisher) {
            is Mono -> publisher.contextCapture()
            is Flux -> publisher.contextCapture()
            else -> publisher
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
// Write the following in `META-INF/services/kotlinx.coroutines.reactive.ContextInjector`.

observation101.context.coroutines.ContextCaptureInjector
Enter fullscreen mode Exit fullscreen mode

Advanced Use Cases

Propagating Thread Local to Runnable/Callable

ContextSnapshot implements a method called wrap, which can be used for this purpose.

val snapshot = ContextSnapshotFactory.builder().build().captureAll()
val wrapped = snapshot.wrap {
    // ...
}

// Equivalent code to the above.
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
val wrapped = Runnable {
    snapshot.setThreadLocals().use {
        // ...
    }
}
Enter fullscreen mode Exit fullscreen mode

Propagating Thread Local to ExecutorService / ScheduledExecutorService

ContextExecutorService and ContextScheduledExecutorService are implemented for this purpose. You can use them.

Propagating Thread Local to Kotlin CoroutineContext

Kotlin Coroutines provide a mechanism called ThreadContextElement for Thread Local. Connecting this with Micrometer Context Propagation will make it work.

class ContextPropagationThreadLocalElement : ThreadContextElement<AutoCloseable>, AbstractCoroutineContextElement(KEY) {
    private val snapshot = factory.captureAll()

    override fun updateThreadContext(context: CoroutineContext): AutoCloseable {
        return snapshot.setThreadLocals()
    }

    override fun restoreThreadContext(context: CoroutineContext, oldState: AutoCloseable) {
        oldState.close()
    }

    companion object {
        private val KEY = object : CoroutineContext.Key<ContextPropagationThreadLocalElement> {}
        private val factory = ContextSnapshotFactory.builder().build()
    }
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

Since many Java ecosystems rely on Thread Local, using Reactor or Kotlin Coroutines can sometimes be challenging.

I originally wrote a similar library for work, but now that Micrometer provides an OSS solution, things have become much easier.

Additionally, as mentioned in this article, implementing in line with Micrometer Context Propagation makes it easy to integrate with Reactor and Kotlin Coroutines.

Previously, I had to write similar code separately for Reactor and Kotlin Coroutines, but Micrometer Context Propagation provides a good abstraction, allowing reuse.

Although this was quite a niche topic, I hope it is useful to someone.

Top comments (0)