Skip to content

Commit 37943d0

Browse files
authored
[JAVA-6028] Add Micrometer/OpenTelemetry tracing support to the reactive-streams (#1898)
* Add Micrometer/OpenTelemetry tracing support to the reactive-streams driver https://jira.mongodb.org/browse/JAVA-6028 Port the tracing infrastructure from the sync driver to driver-reactive-streams, reusing the existing driver-core, TracingManager, Span, and TraceContext classes. * Move error handling and span lifecycle (span.error(), span.end()) from Reactor's doOnError/doFinally operators into the async callback, before emitting the result to the subscriber. * Making sure span is properly closed when an exception occurs
1 parent e5d8668 commit 37943d0

18 files changed

Lines changed: 453 additions & 105 deletions

File tree

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -607,24 +607,61 @@ private <T> void sendAndReceiveAsyncInternal(final CommandMessage message, final
607607
ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this);
608608
ByteBufferBsonOutput compressedBsonOutput = new ByteBufferBsonOutput(this);
609609

610+
Span tracingSpan = null;
610611
try {
611612
message.encode(bsonOutput, operationContext);
612613

614+
tracingSpan = operationContext
615+
.getTracingManager()
616+
.createTracingSpan(message,
617+
operationContext,
618+
() -> message.getCommandDocument(bsonOutput),
619+
cmdName -> SECURITY_SENSITIVE_COMMANDS.contains(cmdName)
620+
|| SECURITY_SENSITIVE_HELLO_COMMANDS.contains(cmdName),
621+
() -> getDescription().getServerAddress(),
622+
() -> getDescription().getConnectionId()
623+
);
624+
613625
CommandEventSender commandEventSender;
614-
if (isLoggingCommandNeeded()) {
615-
BsonDocument commandDocument = message.getCommandDocument(bsonOutput);
626+
boolean isLoggingCommandNeeded = isLoggingCommandNeeded();
627+
boolean isTracingCommandPayloadNeeded = tracingSpan != null && operationContext.getTracingManager().isCommandPayloadEnabled();
628+
629+
BsonDocument commandDocument = null;
630+
if (isLoggingCommandNeeded || isTracingCommandPayloadNeeded) {
631+
commandDocument = message.getCommandDocument(bsonOutput);
632+
}
633+
if (isLoggingCommandNeeded) {
616634
commandEventSender = new LoggingCommandEventSender(
617635
SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener,
618636
operationContext, message, commandDocument,
619637
COMMAND_PROTOCOL_LOGGER, loggerSettings);
620638
} else {
621639
commandEventSender = new NoOpCommandEventSender();
622640
}
641+
if (isTracingCommandPayloadNeeded) {
642+
tracingSpan.tagHighCardinality(QUERY_TEXT.asString(), commandDocument);
643+
}
644+
645+
final Span commandSpan = tracingSpan;
646+
SingleResultCallback<T> tracingCallback = commandSpan == null ? callback : (result, t) -> {
647+
try {
648+
if (t != null) {
649+
if (t instanceof MongoCommandException) {
650+
commandSpan.tagLowCardinality(
651+
RESPONSE_STATUS_CODE.withValue(String.valueOf(((MongoCommandException) t).getErrorCode())));
652+
}
653+
commandSpan.error(t);
654+
}
655+
} finally {
656+
commandSpan.end();
657+
callback.onResult(result, t);
658+
}
659+
};
623660

624661
commandEventSender.sendStartedEvent();
625662
Compressor localSendCompressor = sendCompressor;
626663
if (localSendCompressor == null || SECURITY_SENSITIVE_COMMANDS.contains(message.getCommandDocument(bsonOutput).getFirstKey())) {
627-
sendCommandMessageAsync(message.getId(), decoder, operationContext, callback, bsonOutput, commandEventSender,
664+
sendCommandMessageAsync(message.getId(), decoder, operationContext, tracingCallback, bsonOutput, commandEventSender,
628665
message.isResponseExpected());
629666
} else {
630667
List<ByteBuf> byteBuffers = bsonOutput.getByteBuffers();
@@ -636,12 +673,16 @@ private <T> void sendAndReceiveAsyncInternal(final CommandMessage message, final
636673
ResourceUtil.release(byteBuffers);
637674
bsonOutput.close();
638675
}
639-
sendCommandMessageAsync(message.getId(), decoder, operationContext, callback, compressedBsonOutput, commandEventSender,
676+
sendCommandMessageAsync(message.getId(), decoder, operationContext, tracingCallback, compressedBsonOutput, commandEventSender,
640677
message.isResponseExpected());
641678
}
642679
} catch (Throwable t) {
643680
bsonOutput.close();
644681
compressedBsonOutput.close();
682+
if (tracingSpan != null) {
683+
tracingSpan.error(t);
684+
tracingSpan.end();
685+
}
645686
callback.onResult(null, t);
646687
}
647688
}

driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.COLLECTION;
3939
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.COMMAND_NAME;
4040
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.NAMESPACE;
41+
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.OPERATION_NAME;
42+
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.OPERATION_SUMMARY;
4143
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.NETWORK_TRANSPORT;
4244
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.QUERY_SUMMARY;
4345
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.SERVER_ADDRESS;
@@ -266,4 +268,47 @@ public Span createTracingSpan(final CommandMessage message,
266268

267269
return span;
268270
}
271+
272+
/**
273+
* Creates an operation-level tracing span for a database command.
274+
* <p>
275+
* The span is named "{commandName} {database}[.{collection}]" and tagged with standard
276+
* low-cardinality attributes (system, namespace, collection, operation name, operation summary).
277+
* The span is also set on the {@link OperationContext} for use by downstream command-level tracing.
278+
*
279+
* @param transactionSpan the active transaction span (for parent context), or null
280+
* @param operationContext the operation context to attach the span to
281+
* @param commandName the name of the command (e.g. "find", "insert")
282+
* @param namespace the MongoDB namespace for the operation
283+
* @return the created span, or null if tracing is disabled
284+
*/
285+
@Nullable
286+
public Span createOperationSpan(@Nullable final TransactionSpan transactionSpan,
287+
final OperationContext operationContext, final String commandName, final MongoNamespace namespace) {
288+
if (!isEnabled()) {
289+
return null;
290+
}
291+
TraceContext parentContext = null;
292+
if (transactionSpan != null) {
293+
parentContext = transactionSpan.getContext();
294+
}
295+
String name = commandName + " " + namespace.getDatabaseName()
296+
+ (MongoNamespaceHelper.COMMAND_COLLECTION_NAME.equalsIgnoreCase(namespace.getCollectionName())
297+
? ""
298+
: "." + namespace.getCollectionName());
299+
300+
KeyValues keyValues = KeyValues.of(
301+
SYSTEM.withValue("mongodb"),
302+
NAMESPACE.withValue(namespace.getDatabaseName()));
303+
if (!MongoNamespaceHelper.COMMAND_COLLECTION_NAME.equalsIgnoreCase(namespace.getCollectionName())) {
304+
keyValues = keyValues.and(COLLECTION.withValue(namespace.getCollectionName()));
305+
}
306+
keyValues = keyValues.and(OPERATION_NAME.withValue(commandName),
307+
OPERATION_SUMMARY.withValue(name));
308+
309+
Span span = addSpan(name, parentContext, namespace);
310+
span.tagLowCardinality(keyValues);
311+
operationContext.setTracingSpan(span);
312+
return span;
313+
}
269314
}

driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ClientSession.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import com.mongodb.ClientSessionOptions
1919
import com.mongodb.ServerAddress
2020
import com.mongodb.TransactionOptions
2121
import com.mongodb.internal.TimeoutContext
22+
import com.mongodb.internal.observability.micrometer.TransactionSpan
2223
import com.mongodb.reactivestreams.client.ClientSession as reactiveClientSession
2324
import com.mongodb.session.ClientSession as jClientSession
2425
import com.mongodb.session.ServerSession
@@ -58,6 +59,9 @@ public class ClientSession(public val wrapped: reactiveClientSession) : jClientS
5859
*/
5960
public fun notifyOperationInitiated(operation: Any): Unit = wrapped.notifyOperationInitiated(operation)
6061

62+
/** Get the transaction span (if started). */
63+
public fun getTransactionSpan(): TransactionSpan? = wrapped.transactionSpan
64+
6165
/**
6266
* Get the server address of the pinned mongos on this session. For internal use only.
6367
*

driver-reactive-streams/build.gradle.kts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
import ProjectExtensions.configureJarManifest
1717
import ProjectExtensions.configureMavenPublication
18+
import project.DEFAULT_JAVA_VERSION
1819

1920
plugins {
2021
id("project.java")
@@ -36,6 +37,9 @@ dependencies {
3637
implementation(libs.project.reactor.core)
3738
compileOnly(project(path = ":mongodb-crypt", configuration = "default"))
3839

40+
optionalImplementation(platform(libs.micrometer.observation.bom))
41+
optionalImplementation(libs.micrometer.observation)
42+
3943
testImplementation(libs.project.reactor.test)
4044
testImplementation(project(path = ":driver-sync", configuration = "default"))
4145
testImplementation(project(path = ":bson", configuration = "testArtifacts"))
@@ -45,11 +49,20 @@ dependencies {
4549
// Reactive Streams TCK testing
4650
testImplementation(libs.reactive.streams.tck)
4751

48-
// Tracing
52+
// Tracing testing
4953
testImplementation(platform(libs.micrometer.tracing.integration.test.bom))
5054
testImplementation(libs.micrometer.tracing.integration.test) { exclude(group = "org.junit.jupiter") }
5155
}
5256

57+
tasks.withType<Test> {
58+
// Needed for MicrometerProseTest to set env variable programmatically (calls
59+
// `field.setAccessible(true)`)
60+
val testJavaVersion: Int = findProperty("javaVersion")?.toString()?.toInt() ?: DEFAULT_JAVA_VERSION
61+
if (testJavaVersion >= DEFAULT_JAVA_VERSION) {
62+
jvmArgs("--add-opens=java.base/java.util=ALL-UNNAMED")
63+
}
64+
}
65+
5366
configureMavenPublication {
5467
pom {
5568
name.set("The MongoDB Reactive Streams Driver")

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package com.mongodb.reactivestreams.client;
1919

2020
import com.mongodb.TransactionOptions;
21+
import com.mongodb.internal.observability.micrometer.TransactionSpan;
22+
import com.mongodb.lang.Nullable;
2123
import org.reactivestreams.Publisher;
2224

2325
/**
@@ -94,4 +96,13 @@ public interface ClientSession extends com.mongodb.session.ClientSession {
9496
* @mongodb.server.release 4.0
9597
*/
9698
Publisher<Void> abortTransaction();
99+
100+
/**
101+
* Get the transaction span (if started).
102+
*
103+
* @return the transaction span
104+
* @since 5.7
105+
*/
106+
@Nullable
107+
TransactionSpan getTransactionSpan();
97108
}

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionHelper.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.mongodb.ClientSessionOptions;
2020
import com.mongodb.TransactionOptions;
21+
import com.mongodb.internal.observability.micrometer.TracingManager;
2122
import com.mongodb.internal.session.ServerSessionPool;
2223
import com.mongodb.lang.Nullable;
2324
import com.mongodb.reactivestreams.client.ClientSession;
@@ -31,10 +32,13 @@
3132
public class ClientSessionHelper {
3233
private final MongoClientImpl mongoClient;
3334
private final ServerSessionPool serverSessionPool;
35+
private final TracingManager tracingManager;
3436

35-
public ClientSessionHelper(final MongoClientImpl mongoClient, final ServerSessionPool serverSessionPool) {
37+
public ClientSessionHelper(final MongoClientImpl mongoClient, final ServerSessionPool serverSessionPool,
38+
final TracingManager tracingManager) {
3639
this.mongoClient = mongoClient;
3740
this.serverSessionPool = serverSessionPool;
41+
this.tracingManager = tracingManager;
3842
}
3943

4044
Mono<ClientSession> withClientSession(@Nullable final ClientSession clientSessionFromOperation, final OperationExecutor executor) {
@@ -62,6 +66,6 @@ ClientSession createClientSession(final ClientSessionOptions options, final Oper
6266
.readPreference(mongoClient.getSettings().getReadPreference())
6367
.build()))
6468
.build();
65-
return new ClientSessionPublisherImpl(serverSessionPool, mongoClient, mergedOptions, executor);
69+
return new ClientSessionPublisherImpl(serverSessionPool, mongoClient, mergedOptions, executor, tracingManager);
6670
}
6771
}

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.mongodb.TransactionOptions;
2525
import com.mongodb.WriteConcern;
2626
import com.mongodb.internal.TimeoutContext;
27+
import com.mongodb.internal.observability.micrometer.TracingManager;
28+
import com.mongodb.internal.observability.micrometer.TransactionSpan;
2729
import com.mongodb.internal.operation.AbortTransactionOperation;
2830
import com.mongodb.internal.operation.CommitTransactionOperation;
2931
import com.mongodb.internal.operation.ReadOperation;
@@ -48,17 +50,21 @@ final class ClientSessionPublisherImpl extends BaseClientSessionImpl implements
4850

4951
private final MongoClientImpl mongoClient;
5052
private final OperationExecutor executor;
53+
private final TracingManager tracingManager;
5154
private TransactionState transactionState = TransactionState.NONE;
5255
private boolean messageSentInCurrentTransaction;
5356
private boolean commitInProgress;
5457
private TransactionOptions transactionOptions;
58+
@Nullable
59+
private TransactionSpan transactionSpan;
5560

5661

5762
ClientSessionPublisherImpl(final ServerSessionPool serverSessionPool, final MongoClientImpl mongoClient,
58-
final ClientSessionOptions options, final OperationExecutor executor) {
63+
final ClientSessionOptions options, final OperationExecutor executor, final TracingManager tracingManager) {
5964
super(serverSessionPool, mongoClient, options);
6065
this.executor = executor;
6166
this.mongoClient = mongoClient;
67+
this.tracingManager = tracingManager;
6268
}
6369

6470
@Override
@@ -128,6 +134,10 @@ public void startTransaction(final TransactionOptions transactionOptions) {
128134
if (!writeConcern.isAcknowledged()) {
129135
throw new MongoClientException("Transactions do not support unacknowledged write concern");
130136
}
137+
138+
if (tracingManager.isEnabled()) {
139+
transactionSpan = new TransactionSpan(tracingManager);
140+
}
131141
clearTransactionContext();
132142
setTimeoutContext(timeoutContext);
133143
}
@@ -152,6 +162,9 @@ public Publisher<Void> commitTransaction() {
152162
}
153163
if (!messageSentInCurrentTransaction) {
154164
cleanupTransaction(TransactionState.COMMITTED);
165+
if (transactionSpan != null) {
166+
transactionSpan.finalizeTransactionSpan(TransactionState.COMMITTED.name());
167+
}
155168
return Mono.create(MonoSink::success);
156169
} else {
157170
ReadConcern readConcern = transactionOptions.getReadConcern();
@@ -171,7 +184,17 @@ public Publisher<Void> commitTransaction() {
171184
commitInProgress = false;
172185
transactionState = TransactionState.COMMITTED;
173186
})
174-
.doOnError(MongoException.class, this::clearTransactionContextOnError);
187+
.doOnError(MongoException.class, e -> {
188+
clearTransactionContextOnError(e);
189+
if (transactionSpan != null) {
190+
transactionSpan.handleTransactionSpanError(e);
191+
}
192+
})
193+
.doOnSuccess(v -> {
194+
if (transactionSpan != null) {
195+
transactionSpan.finalizeTransactionSpan(TransactionState.COMMITTED.name());
196+
}
197+
});
175198
}
176199
});
177200
}
@@ -191,6 +214,9 @@ public Publisher<Void> abortTransaction() {
191214
}
192215
if (!messageSentInCurrentTransaction) {
193216
cleanupTransaction(TransactionState.ABORTED);
217+
if (transactionSpan != null) {
218+
transactionSpan.finalizeTransactionSpan(TransactionState.ABORTED.name());
219+
}
194220
return Mono.create(MonoSink::success);
195221
} else {
196222
ReadConcern readConcern = transactionOptions.getReadConcern();
@@ -208,6 +234,9 @@ public Publisher<Void> abortTransaction() {
208234
.doOnTerminate(() -> {
209235
clearTransactionContext();
210236
cleanupTransaction(TransactionState.ABORTED);
237+
if (transactionSpan != null) {
238+
transactionSpan.finalizeTransactionSpan(TransactionState.ABORTED.name());
239+
}
211240
});
212241
}
213242
});
@@ -219,6 +248,12 @@ private void clearTransactionContextOnError(final MongoException e) {
219248
}
220249
}
221250

251+
@Override
252+
@Nullable
253+
public TransactionSpan getTransactionSpan() {
254+
return transactionSpan;
255+
}
256+
222257
@Override
223258
public void close() {
224259
if (transactionState == TransactionState.IN) {

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.mongodb.internal.connection.Cluster;
3434
import com.mongodb.internal.diagnostics.logging.Logger;
3535
import com.mongodb.internal.diagnostics.logging.Loggers;
36+
import com.mongodb.internal.observability.micrometer.TracingManager;
3637
import com.mongodb.internal.session.ServerSessionPool;
3738
import com.mongodb.lang.Nullable;
3839
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
@@ -88,9 +89,10 @@ private MongoClientImpl(final MongoClientSettings settings, final MongoDriverInf
8889
notNull("settings", settings);
8990
notNull("cluster", cluster);
9091

92+
TracingManager tracingManager = new TracingManager(settings.getObservabilitySettings());
9193
TimeoutSettings timeoutSettings = TimeoutSettings.create(settings);
9294
ServerSessionPool serverSessionPool = new ServerSessionPool(cluster, timeoutSettings, settings.getServerApi());
93-
ClientSessionHelper clientSessionHelper = new ClientSessionHelper(this, serverSessionPool);
95+
ClientSessionHelper clientSessionHelper = new ClientSessionHelper(this, serverSessionPool, tracingManager);
9496

9597
AutoEncryptionSettings autoEncryptSettings = settings.getAutoEncryptionSettings();
9698
Crypt crypt = autoEncryptSettings != null ? Crypts.createCrypt(settings, autoEncryptSettings) : null;
@@ -100,7 +102,8 @@ private MongoClientImpl(final MongoClientSettings settings, final MongoDriverInf
100102
+ ReactiveContextProvider.class.getName() + " when using the Reactive Streams driver");
101103
}
102104
OperationExecutor operationExecutor = executor != null ? executor
103-
: new OperationExecutorImpl(this, clientSessionHelper, timeoutSettings, (ReactiveContextProvider) contextProvider);
105+
: new OperationExecutorImpl(this, clientSessionHelper, timeoutSettings, (ReactiveContextProvider) contextProvider,
106+
tracingManager);
104107
MongoOperationPublisher<Document> mongoOperationPublisher = new MongoOperationPublisher<>(Document.class,
105108
withUuidRepresentation(settings.getCodecRegistry(),
106109
settings.getUuidRepresentation()),

0 commit comments

Comments
 (0)