Introduction
A library called Micrometer Context Propagation has been introduced in Spring Boot and Reactor (starting from Spring Boot 3).
- https://github.com/micrometer-metrics/context-propagation
- https://docs.micrometer.io/context-propagation/reference/
As the name suggests, this library facilitates Context Propagation between threads.
In this article, I will explain Micrometer Context Propagation. Since this is a niche topic and there are not many articles covering it, I hope this will be helpful to someone.
Why was Micrometer Context Propagation created?
Originally, Micrometer became popular in the Java community as a metrics library. Over time, Micrometer expanded its scope to cover tracing and logging as well, essentially extending its reach to observability.
Generally (not just in Java), tracing requires a technique called Context Propagation. Micrometer Context Propagation is designed to extract and provide this technique in an easy-to-use manner.
It is not exclusive to Micrometer but is also designed to work easily with various libraries such as Kotlin Coroutine Context and Reactor Context.
In Spring Boot, there was a library called Spring Cloud Sleuth (hereafter referred to as Sleuth), which handled tracing in Spring WebMVC/WebFlux.
https://spring.io/projects/spring-cloud-sleuth
With Spring Boot 3, Sleuth was deprecated, and its functionalities were incorporated into Micrometer and Spring Boot.
What is Context Propagation?
As the name suggests, it is a technique for correctly propagating context. At this point, you might wonder, "What is context?" In this case, you can think of it as MDC or TraceContext (a collection of values such as traceId and spanId).
Traditionally, in Java, context propagation could be achieved by simply storing values in a Thread Local. In fact, both MDC and (brave's) TraceContext are stored in Thread Local.
... This might be hard to understand just from an explanation, so let’s look at a concrete example. Let's start with MDC, which is the most familiar case.
fun main() {
MDC.put("hoge", "hoge-value")
someFunc1()
someFunc2()
}
fun someFunc1() {
println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
fun someFunc2() {
println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
Here is the result. There should be no surprises here—the MDC setting remains intact.
Called someFunc1 func. thread=main, mdc={hoge=hoge-value}
Called someFunc2 func. thread=main, mdc={hoge=hoge-value}
Now, what about the following example? Here, someFunc2
is executed on a separate thread.
fun main() {
MDC.put("hoge", "hoge-value")
someFunc1()
thread { someFunc2() }.join()
}
fun someFunc1() {
println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
fun someFunc2() {
println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
Here is the result. In someFunc2
, the contents of MDC are null
. Since MDC is based on Thread Local, it does not propagate correctly when the thread changes.
Called someFunc1 func. thread=main, mdc={hoge=hoge-value}
Called someFunc2 func. thread=Thread-0, mdc=null
To propagate it correctly, we need to use a technique like the one below (which is what Context Propagation is). Here, we define a custom function called mdcPropagatingThread
.
fun main() {
MDC.put("hoge", "hoge-value")
someFunc1()
mdcPropagatingThread { someFunc2() }.join()
}
fun someFunc1() {
println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
fun someFunc2() {
println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
fun mdcPropagatingThread(block: () -> Unit): Thread {
val map = MDC.getCopyOfContextMap()
return thread {
MDC.setContextMap(map)
try {
block()
} finally {
MDC.clear() // In this example, it's a disposable thread, so there's no need to clear it...
}
}
}
Here is the result. The original MDC is successfully carried over to someFunc2
.
Called someFunc1 func. thread=main, mdc={hoge=hoge-value}
Called someFunc2 func. thread=Thread-0, mdc={hoge=hoge-value}
Micrometer Context Propagation provides an abstraction of such techniques as an OSS library.
The previous example can be rewritten using Micrometer Context Propagation as follows.
At first glance, you might think that the additional MDCAccessor
class makes the code longer and more cumbersome. However, since it becomes reusable in various places, it actually simplifies things overall.
val snapshotFactory = ContextSnapshotFactory.builder().build()
fun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
MDC.put("hoge", "hoge-value")
someFunc1()
contextPropagatingThread { someFunc2() }.join()
}
fun someFunc1() {
println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
fun someFunc2() {
println("Called someFunc2 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
fun contextPropagatingThread(block: () -> Unit): Thread {
val snapshot = snapshotFactory.captureAll()
return thread {
snapshot.setThreadLocals().use {
block()
}
}
}
class MDCAccessor : ThreadLocalAccessor<Map<String, String?>> {
override fun key(): Any {
return MDCAccessor::class.java
}
override fun getValue(): Map<String, String?>? {
return MDC.getCopyOfContextMap()
}
override fun setValue(value: Map<String, String?>) {
MDC.setContextMap(value)
}
override fun setValue() {
MDC.clear()
}
}
How to Use Micrometer Context Propagation
The introduction took too long, but now we move to the main topic—how to use Micrometer Context Propagation.
The main classes (or concepts) to remember are the following four:
- ThreadLocalAccessor
- ContextAccessor
- ContextRegistry
- ContextSnapshot
ThreadLocalAccessor
When you want to propagate Thread Local values, implement the ThreadLocalAccessor interface.
You need to override the following methods:
-
Object key()
- Returns a unique key.
- You can use
Class<?>
(e.g.,SomeThreadLocalAccessor::class.java
).
-
@Nullable V getValue()
- Returns the value to be propagated.
- Usually retrieved from Thread Local.
-
void setValue(V value)
- Sets the value to be propagated.
- Typically stored in Thread Local.
-
V
is non-null. - https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/DefaultContextSnapshot.java#L86
-
void setValue()
- Not mandatory unless overriding
restore()
, but required whenclearMissing
is enabled. - Called when the propagated value is
null
. - https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/DefaultContextSnapshot.java#L89
- Not mandatory unless overriding
-
void restore(V previousValue)
- Not mandatory. If not implemented,
setValue(V value)
is used instead. - Called when restoring the original value.
- https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/DefaultContextSnapshot.java#L149
-
V
is non-null.
- Not mandatory. If not implemented,
-
void restore()
- Not mandatory. If not implemented,
setValue()
is used instead. - Called when restoring a
null
value. - https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/DefaultContextSnapshot.java#L152
- Not mandatory. If not implemented,
Here are some examples.
Example: MDC
(Reiterated example)
class MDCAccessor : ThreadLocalAccessor<Map<String, String?>> {
override fun key(): Any {
return MDCAccessor::class.java
}
override fun getValue(): Map<String, String?>? {
return MDC.getCopyOfContextMap()
}
override fun setValue(value: Map<String, String?>) {
MDC.setContextMap(value)
}
override fun setValue() {
MDC.clear()
}
}
However, since brave
may update MDC, the above approach might miss some propagation. In some cases, it's better to use the following approach:
class MDCAccessor : ThreadLocalAccessor<Map<String, String?>> {
override fun key(): Any {
return MDCAccessor::class.java
}
override fun getValue(): Map<String, String?>? {
return MDC.getCopyOfContextMap()
}
override fun setValue(value: Map<String, String?>) {
// There are cases where it doesn't work well when used together with Brave's MDC decoration, so use the put method.
// see: https://github.com/openzipkin/brave/blob/69003dfc811418f0dbc42e9e17ff880ebe1f4b02/brave/src/main/java/brave/propagation/CurrentTraceContext.java#L130
value.forEach { (k, v) -> MDC.put(k, v) }
}
override fun setValue() {
// NOOP
}
override fun restore(previousValue: Map<String, String?>) {
MDC.setContextMap(previousValue)
}
override fun restore() {
MDC.clear()
}
}
With the restore
method, there's some flexibility.
Example: Brave Trace Context
(Not tested, but probably works like this.)
This is just an example, but if you use micrometer-tracing
, you can use the built-in ObservationThreadLocalAccessor to propagate the Trace Context.
class BraveTracingContextAccessor : ThreadLocalAccessor<TraceContext> {
override fun key(): Any {
return BraveTracingContextAccessor::class.java
}
override fun getValue(): TraceContext? {
return Tracing.current().currentTraceContext().get()
}
override fun setValue(value: TraceContext) {
Tracing.current().currentTraceContext().maybeScope(value)
}
override fun setValue() {
Tracing.current().currentTraceContext().maybeScope(null)
}
}
ContextAccessor
When you want to propagate Map-like objects such as Reactor Context, implement the ContextAccessor interface.
However, you probably won't need to implement this yourself often, so you don't need to remember it too much.
Example: Reactor Context
Reactor provides an implementation called ReactorContextAccessor.
/**
* A {@code ContextAccessor} to enable reading values from a Reactor
* {@link ContextView} and writing values to {@link Context}.
* <p>
* Please note that this public class implements the {@code libs.micrometer.contextPropagation}
* SPI library, which is an optional dependency.
*
* @author Rossen Stoyanchev
* @author Simon Baslé
* @since 3.5.0
*/
public final class ReactorContextAccessor implements ContextAccessor<ContextView, Context> {
@Override
public Class<? extends ContextView> readableType() {
return ContextView.class;
}
@Override
public void readValues(ContextView source, Predicate<Object> keyPredicate, Map<Object, Object> target) {
source.forEach((k, v) -> {
if (keyPredicate.test(k)) {
target.put(k, v);
}
});
}
@Override
@Nullable
public <T> T readValue(ContextView sourceContext, Object key) {
return sourceContext.getOrDefault(key, null);
}
@Override
public Class<? extends Context> writeableType() {
return Context.class;
}
@Override
public Context writeValues(Map<Object, Object> source, Context target) {
return target.putAllMap(source);
}
}
ContextRegistry
ContextRegistry is an instance that holds the previously mentioned ThreadLocalAccessor and ContextAccessor.
You can register it like this. It is expected to be registered before starting the application in the main
method.
ContextRegistry.getInstance()
.registerThreadLocalAccessor(...) // ThreadLocalAccessor
.registerContextAccessor(...) // ContextAccesor
Although multiple instances of ContextRegistry can be created, it is recommended to share and use the instance obtained via ContextRegistry.getInstance()
. At the very least, Reactor uses the instance obtained via ContextRegistry.getInstance()
.
Additionally, it can be automatically registered via the Service Loader.
ContextRegistry()
.loadThreadLocalAccessors()
.loadContextAccessors()
The instance obtained via ContextRegistry.getInstance()
is pre-registered through the Service Loader.
https://github.com/micrometer-metrics/context-propagation/blob/75a243f3427d0941e09302c3fc29f5b2a0297583/context-propagation/src/main/java/io/micrometer/context/ContextRegistry.java#L40-L41
For example, the ObservationThreadLocalAccessor
for Micrometer Observation is automatically registered via the Service Loader.
https://github.com/micrometer-metrics/micrometer/blob/a56b968ba5b3db9b5e4a4feac813080783a16f5f/micrometer-observation/src/main/resources/META-INF/services/io.micrometer.context.ThreadLocalAccessor
ReactorContextAccessor is also automatically registered via the Service Loader.
https://github.com/reactor/reactor-core/blob/5553aa80137482ec26acb960f4d4f42b8a44da94/reactor-core/src/main/resources/META-INF/services/io.micrometer.context.ContextAccessor
ContextSnapshot
As the name suggests, this is a snapshot of a context. By calling setThreadLocals
or updateContext
implemented in this class, context can be propagated to various locations (such as different threads or Reactor Context).
A ContextSnapshot can be created via ContextSnapshotFactory
.
// Retrieve from the registered ThreadLocalAccessor.
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
// Retrieve from the registered ThreadLocalAccessor and Reactor Context. ReactorContextAccessor must be registered.
val snapshot = ContextSnapshotFactory.builder().build().captureAll(reactorContext)
When creating a ContextSnapshotFactory
, you can configure the following settings via the builder (the example above does not set anything):
-
contextRegistry
- By default, it uses the instance obtained from
ContextRegistry.getInstance()
. - It is generally fine to leave it as the default. Specify it if necessary.
- By default, it uses the instance obtained from
-
captureKeyPredicate
- You can specify which
ThreadLocalAccessor
to capture using the value ofThreadLocalAccessor#key()
. If not specified, all registeredThreadLocalAccessor
s will be used. - It is generally fine to leave it as the default. Specify it if necessary.
- You can specify which
-
clearMissing
- Determines whether to call
ThreadLocalAccessor#setValue
when the value to be propagated is missing. - https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/DefaultContextSnapshot.java#L89
- It is generally fine to leave it as the default. Specify it if necessary.
- Determines whether to call
Creating a ContextSnapshotFactory
every time is cumbersome, so it is advisable to store it as a static variable or register it as a bean.
To propagate a ContextSnapshot
to another thread, you can do something like this:
fun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
MDC.put("hoge", "hoge-value")
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
thread {
snapshot.setThreadLocals().use {
someFunc1()
}
}
}
fun someFunc1() {
println("Called someFunc1 func. thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
By specifying a keyPredicate
when calling setThreadLocals
, you can filter which ThreadLocalAccessor
s will be active.
For example, if you want to activate only MDCAccessor
, you can do this:
fun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
MDC.put("hoge", "hoge-value")
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
thread {
snapshot.setThreadLocals { key -> key == MDCAccessor::class.java }.use {
someFunc1()
}
}
}
Propagating to Reactor Context
To propagate to Reactor Context using ContextSnapshot
, you can do the following. ReactorContextAccessor
needs to be registered.
fun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
val snapshotFactory = ContextSnapshotFactory.builder().build()
MDC.put("hoge", "hoge-value")
val snapshot = snapshotFactory.captureAll()
Mono.delay(Duration.ofMillis(100))
.flatMap { value ->
Mono.deferContextual {
// Propagated to the Reactor Context.
println("thread=${Thread.currentThread().name}, mdcMap=${it.get<Map<String, String?>>(MDCAccessor::class.java)}")
// It does not propagate to ThreadLocal.
println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
// If you want to propagate it to ThreadLocal, write it like this again.
snapshotFactory.captureFrom(it).setThreadLocals().use {
println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
Mono.just(value)
}
}
// Propagate to the Reactor Context using the `updateContext` method.
.contextWrite { snapshot.updateContext(it) }
.block()
}
// Result
thread=parallel-1, mdcMap={hoge=hoge-value}
thread=parallel-1, mdc=null
thread=parallel-1, mdc={hoge=hoge-value}
However, in Reactor, a convenient method called contextCapture
is available, so you can write it like this:
fun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
val snapshotFactory = ContextSnapshotFactory.builder().build()
MDC.put("hoge", "hoge-value")
Mono.delay(Duration.ofMillis(100))
.flatMap { value ->
Mono.deferContextual {
// Propagated to the Reactor Context.
println("thread=${Thread.currentThread().name}, mdcMap=${it.get<Map<String, String?>>(MDCAccessor::class.java)}")
// It does not propagate to ThreadLocal.
println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
// If you want to propagate it to ThreadLocal, write it like this again.
snapshotFactory.captureFrom(it).setThreadLocals().use {
println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
}
Mono.just(value)
}
}
.contextCapture() // This
.block()
}
... There's still quite a bit to write, making it inconvenient. Running Hooks.enableAutomaticContextPropagation()
will automatically handle the following, making it significantly more convenient:
-
contextCapture()
beforeblock()
(processing from thread local to Reactor context) -
snapshotFactory.captureFrom(it).setThreadLocals()
written inMono.deferContextual
(processing from Reactor context to thread local)
fun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
Hooks.enableAutomaticContextPropagation()
MDC.put("hoge", "hoge-value")
Mono.delay(Duration.ofMillis(100))
.flatMap { value ->
Mono.deferContextual {
// Propagated to the Reactor Context.
println("thread=${Thread.currentThread().name}, mdcMap=${it.get<Map<String, String?>>(MDCAccessor::class.java)}")
// Propagated to both Reactor Context and ThreadLocal.
println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
Mono.just(value)
}
}
.block()
}
(Bonus) Kotlin Coroutines and Hooks.enableAutomaticContextPropagation
As mentioned earlier, using Hooks.enableAutomaticContextPropagation
allows Mono.block
and similar methods to automatically execute contextCapture()
, but it does not work for Coroutine methods like awaitSingle
.
ffun main() {
ContextRegistry.getInstance().registerThreadLocalAccessor(MDCAccessor())
Hooks.enableAutomaticContextPropagation()
val job = GlobalScope.launch {
MDC.put("hoge", "hoge-value")
Mono.delay(Duration.ofMillis(100))
.flatMap { value ->
Mono.deferContextual {
// Not propagated to the Reactor Context.
println("thread=${Thread.currentThread().name}, mdcMap=${it.getOrDefault<Map<String, String?>>(MDCAccessor::class.java, null)}")
// Propagated to ThreadLocal.
println("thread=${Thread.currentThread().name}, mdc=${MDC.getCopyOfContextMap()}")
Mono.just(value)
}
}
.awaitSingle()
}
runBlocking { job.join() }
}
// Result
thread=parallel-1, mdcMap=null
thread=parallel-1, mdc={hoge=hoge-value}
By defining the following class and registering it in the Service Loader, contextCapture
will be automatically executed, just like with block
. However, I am not entirely confident if this is the best approach yet.
(I have opened an issue about this, so if you're interested, please check it out → https://github.com/reactor/reactor-core/issues/3563)
package observation101.context.coroutines
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.reactive.ContextInjector
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import kotlin.coroutines.CoroutineContext
@OptIn(InternalCoroutinesApi::class)
class ContextCaptureInjector : ContextInjector {
override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
return when (publisher) {
is Mono -> publisher.contextCapture()
is Flux -> publisher.contextCapture()
else -> publisher
}
}
}
// Write the following in `META-INF/services/kotlinx.coroutines.reactive.ContextInjector`.
observation101.context.coroutines.ContextCaptureInjector
Advanced Use Cases
Propagating Thread Local to Runnable/Callable
ContextSnapshot
implements a method called wrap
, which can be used for this purpose.
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
val wrapped = snapshot.wrap {
// ...
}
// Equivalent code to the above.
val snapshot = ContextSnapshotFactory.builder().build().captureAll()
val wrapped = Runnable {
snapshot.setThreadLocals().use {
// ...
}
}
Propagating Thread Local to ExecutorService / ScheduledExecutorService
ContextExecutorService
and ContextScheduledExecutorService
are implemented for this purpose. You can use them.
- https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/ContextExecutorService.java
- https://github.com/micrometer-metrics/context-propagation/blob/f6026d7e8ac833a822bc2032ac44d462b04c0775/context-propagation/src/main/java/io/micrometer/context/ContextScheduledExecutorService.java
Propagating Thread Local to Kotlin CoroutineContext
Kotlin Coroutines provide a mechanism called ThreadContextElement for Thread Local. Connecting this with Micrometer Context Propagation will make it work.
class ContextPropagationThreadLocalElement : ThreadContextElement<AutoCloseable>, AbstractCoroutineContextElement(KEY) {
private val snapshot = factory.captureAll()
override fun updateThreadContext(context: CoroutineContext): AutoCloseable {
return snapshot.setThreadLocals()
}
override fun restoreThreadContext(context: CoroutineContext, oldState: AutoCloseable) {
oldState.close()
}
companion object {
private val KEY = object : CoroutineContext.Key<ContextPropagationThreadLocalElement> {}
private val factory = ContextSnapshotFactory.builder().build()
}
}
Conclusion
Since many Java ecosystems rely on Thread Local, using Reactor or Kotlin Coroutines can sometimes be challenging.
I originally wrote a similar library for work, but now that Micrometer provides an OSS solution, things have become much easier.
Additionally, as mentioned in this article, implementing in line with Micrometer Context Propagation makes it easy to integrate with Reactor and Kotlin Coroutines.
Previously, I had to write similar code separately for Reactor and Kotlin Coroutines, but Micrometer Context Propagation provides a good abstraction, allowing reuse.
Although this was quite a niche topic, I hope it is useful to someone.
Top comments (0)