Skip to content

Commit 88071e0

Browse files
stIncMalenhachicha
authored andcommitted
Fix/improve ClientSessionImpl, remove ClientSessionClock
1 parent 9f34468 commit 88071e0

9 files changed

Lines changed: 107 additions & 95 deletions

File tree

driver-core/src/main/com/mongodb/internal/TimeoutContext.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ public Timeout timeoutIncludingRoundTrip() {
168168
return timeout == null ? null : timeout.shortenBy(minRoundTripTimeMS, MILLISECONDS);
169169
}
170170

171+
public Timeout timeoutOrAlternative(final Timeout alternative) {
172+
return timeout == null ? alternative : timeout;
173+
}
174+
171175
/**
172176
* Returns the remaining {@code timeoutMS} if set or the {@code alternativeTimeoutMS}.
173177
*
@@ -176,6 +180,7 @@ public Timeout timeoutIncludingRoundTrip() {
176180
* @param alternativeTimeoutMS the alternative timeout.
177181
* @return timeout to use.
178182
*/
183+
@VisibleForTesting(otherwise = PRIVATE)
179184
public long timeoutOrAlternative(final long alternativeTimeoutMS) {
180185
if (timeout == null) {
181186
return alternativeTimeoutMS;
@@ -380,11 +385,6 @@ public TimeoutContext withAdditionalReadTimeout(final int additionalReadTimeout)
380385
return new TimeoutContext(timeoutSettings.withReadTimeoutMS(newReadTimeout > 0 ? newReadTimeout : Long.MAX_VALUE));
381386
}
382387

383-
// Creates a copy of the timeout context that can be reset without resetting the original.
384-
public TimeoutContext copyTimeoutContext() {
385-
return new TimeoutContext(getTimeoutSettings(), getTimeout());
386-
}
387-
388388
@Override
389389
public String toString() {
390390
return "TimeoutContext{"

driver-core/src/main/com/mongodb/internal/connection/Time.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
* To enable unit testing of classes that rely on System.nanoTime
2121
*
2222
* <p>This class is not part of the public API and may be removed or changed at any time</p>
23+
*
24+
* @deprecated Use {@link com.mongodb.internal.time.SystemNanoTime} in production code,
25+
* and {@code Mockito.mockStatic} in test code to tamper with it.
2326
*/
27+
@Deprecated
2428
public final class Time {
2529
static final long CONSTANT_TIME = 42;
2630

driver-core/src/main/com/mongodb/internal/time/StartTime.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,6 @@ public interface StartTime {
5959
* @return a StartPoint, as of now
6060
*/
6161
static StartTime now() {
62-
return TimePoint.at(System.nanoTime());
62+
return TimePoint.at(SystemNanoTime.get());
6363
}
6464
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.internal.time;
17+
18+
/**
19+
* Avoid using this class directly and prefer using other program elements from {@link com.mongodb.internal.time}, if possible.
20+
* <p>
21+
* We do not use {@link System#nanoTime()} directly in the rest of the {@link com.mongodb.internal.time} package,
22+
* and use {@link SystemNanoTime#get()} instead because we need to tamper with it via {@code Mockito.mockStatic},
23+
* and mocking methods of {@link System} class is both impossible and unwise.
24+
*/
25+
public final class SystemNanoTime {
26+
private SystemNanoTime() {
27+
}
28+
29+
public static long get() {
30+
return System.nanoTime();
31+
}
32+
}

driver-core/src/main/com/mongodb/internal/time/TimePoint.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ static TimePoint at(@Nullable final Long nanos) {
6161

6262
@VisibleForTesting(otherwise = PRIVATE)
6363
long currentNanos() {
64-
return System.nanoTime();
64+
return SystemNanoTime.get();
6565
}
6666

6767
/**
6868
* Returns the current {@link TimePoint}.
6969
*/
7070
static TimePoint now() {
71-
return at(System.nanoTime());
71+
return at(SystemNanoTime.get());
7272
}
7373

7474
/**

driver-sync/src/main/com/mongodb/client/internal/ClientSessionClock.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,12 @@
3939
import com.mongodb.internal.session.BaseClientSessionImpl;
4040
import com.mongodb.internal.session.ServerSessionPool;
4141
import com.mongodb.internal.time.ExponentialBackoff;
42+
import com.mongodb.internal.time.Timeout;
4243
import com.mongodb.lang.Nullable;
4344

45+
import java.util.concurrent.TimeUnit;
46+
import java.util.function.BooleanSupplier;
47+
4448
import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL;
4549
import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL;
4650
import static com.mongodb.assertions.Assertions.assertNotNull;
@@ -51,7 +55,7 @@
5155

5256
final class ClientSessionImpl extends BaseClientSessionImpl implements ClientSession {
5357

54-
private static final int MAX_RETRY_TIME_LIMIT_MS = 120000;
58+
private static final long MAX_RETRY_TIME_LIMIT_MS = 120000;
5559

5660
private final OperationExecutor operationExecutor;
5761
private TransactionState transactionState = TransactionState.NONE;
@@ -251,21 +255,23 @@ public <T> T withTransaction(final TransactionBody<T> transactionBody) {
251255
@Override
252256
public <T> T withTransaction(final TransactionBody<T> transactionBody, final TransactionOptions options) {
253257
notNull("transactionBody", transactionBody);
254-
long startTime = ClientSessionClock.INSTANCE.now();
255258
TimeoutContext withTransactionTimeoutContext = createTimeoutContext(options);
256-
ExponentialBackoff transactionBackoff = ExponentialBackoff.TRANSACTION;
259+
Timeout withTransactionTimeout = withTransactionTimeoutContext.timeoutOrAlternative(
260+
assertNotNull(TimeoutContext.startTimeout(MAX_RETRY_TIME_LIMIT_MS)));
261+
BooleanSupplier withTransactionTimeoutExpired = () -> withTransactionTimeout.call(TimeUnit.MILLISECONDS,
262+
() -> false, ms -> false, () -> true);
257263
int transactionAttempt = 0;
258264
MongoException lastError = null;
259265

260266
try {
261267
outer:
262268
while (true) {
263269
if (transactionAttempt > 0) {
264-
backoff(transactionBackoff, transactionAttempt, startTime, lastError);
270+
backoff(transactionAttempt, withTransactionTimeout, assertNotNull(lastError));
265271
}
266272
T retVal;
267273
try {
268-
startTransaction(options, withTransactionTimeoutContext.copyTimeoutContext());
274+
startTransaction(options, withTransactionTimeoutContext);
269275
transactionAttempt++;
270276

271277
if (transactionSpan != null) {
@@ -281,7 +287,7 @@ public <T> T withTransaction(final TransactionBody<T> transactionBody, final Tra
281287
if (!(e instanceof MongoOperationTimeoutException)) {
282288
MongoException exceptionToHandle = OperationHelper.unwrap((MongoException) e);
283289
if (exceptionToHandle.hasErrorLabel(TRANSIENT_TRANSACTION_ERROR_LABEL)
284-
&& ClientSessionClock.INSTANCE.now() - startTime < MAX_RETRY_TIME_LIMIT_MS) {
290+
&& !withTransactionTimeoutExpired.getAsBoolean()) {
285291
if (transactionSpan != null) {
286292
transactionSpan.spanFinalizing(false);
287293
}
@@ -297,9 +303,10 @@ public <T> T withTransaction(final TransactionBody<T> transactionBody, final Tra
297303
commitTransaction(false);
298304
break;
299305
} catch (MongoException e) {
306+
lastError = e;
300307
clearTransactionContextOnError(e);
301308
if (!(e instanceof MongoOperationTimeoutException)
302-
&& ClientSessionClock.INSTANCE.now() - startTime < MAX_RETRY_TIME_LIMIT_MS) {
309+
&& !withTransactionTimeoutExpired.getAsBoolean()) {
303310
applyMajorityWriteConcernToTransactionOptions();
304311

305312
if (!(e instanceof MongoExecutionTimeoutException)
@@ -309,7 +316,6 @@ public <T> T withTransaction(final TransactionBody<T> transactionBody, final Tra
309316
if (transactionSpan != null) {
310317
transactionSpan.spanFinalizing(true);
311318
}
312-
lastError = e;
313319
continue outer;
314320
}
315321
}
@@ -374,15 +380,12 @@ private TimeoutContext createTimeoutContext(final TransactionOptions transaction
374380
operationExecutor.getTimeoutSettings()));
375381
}
376382

377-
private static void backoff(final ExponentialBackoff exponentialBackoff, final int transactionAttempt, final long startTime,
378-
final MongoException lastError) {
379-
long backoffMs = exponentialBackoff.calculateDelayBeforeNextRetryMs(transactionAttempt - 1);
380-
if (ClientSessionClock.INSTANCE.now() + backoffMs - startTime >= MAX_RETRY_TIME_LIMIT_MS) {
381-
if (lastError != null) {
382-
throw lastError;
383-
}
384-
throw new MongoClientException("Transaction retry timeout exceeded");
385-
}
383+
private static void backoff(final int transactionAttempt,
384+
final Timeout withTransactionTimeout, final MongoException lastError) {
385+
long backoffMs = ExponentialBackoff.TRANSACTION.calculateDelayBeforeNextRetryMs(transactionAttempt - 1);
386+
withTransactionTimeout.shortenBy(backoffMs, TimeUnit.MILLISECONDS).onExpired(() -> {
387+
throw lastError;
388+
});
386389
try {
387390
if (backoffMs > 0) {
388391
Thread.sleep(backoffMs);

driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import static org.junit.jupiter.api.Assumptions.assumeFalse;
2525

26-
2726
// See https://github.com/mongodb/specifications/tree/master/source/client-side-operation-timeout/tests
2827
public class ClientSideOperationTimeoutTest extends UnifiedSyncTest {
2928

driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,22 @@
2020
import com.mongodb.MongoClientException;
2121
import com.mongodb.MongoException;
2222
import com.mongodb.TransactionOptions;
23-
import com.mongodb.client.internal.ClientSessionClock;
2423
import com.mongodb.client.model.Sorts;
2524
import com.mongodb.internal.time.ExponentialBackoff;
25+
import com.mongodb.internal.time.StartTime;
26+
import com.mongodb.internal.time.SystemNanoTime;
2627
import org.bson.BsonDocument;
2728
import org.bson.Document;
2829
import org.junit.jupiter.api.BeforeEach;
2930
import org.junit.jupiter.api.DisplayName;
3031
import org.junit.jupiter.api.Test;
32+
import org.mockito.MockedStatic;
33+
import org.mockito.Mockito;
3134

35+
import java.time.Duration;
3236
import java.util.concurrent.TimeUnit;
3337
import java.util.concurrent.atomic.AtomicInteger;
38+
import java.util.function.Consumer;
3439

3540
import static com.mongodb.ClusterFixture.TIMEOUT;
3641
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet;
@@ -44,8 +49,7 @@
4449

4550
// See https://github.com/mongodb/specifications/blob/master/source/transactions-convenient-api/tests/README.md#prose-tests
4651
public class WithTransactionProseTest extends DatabaseTestCase {
47-
private static final long START_TIME_MS = 1L;
48-
private static final long ERROR_GENERATING_INTERVAL = 121000L;
52+
private static final Duration ERROR_GENERATING_INTERVAL = Duration.ofSeconds(120);
4953

5054
@BeforeEach
5155
@Override
@@ -66,7 +70,7 @@ public void setUp() {
6670
public void testCallbackRaisesCustomError() {
6771
final String exceptionMessage = "NotTransientOrUnknownError";
6872
try (ClientSession session = client.startSession()) {
69-
session.withTransaction((TransactionBody<Void>) () -> {
73+
session.withTransaction(() -> {
7074
throw new MongoException(exceptionMessage);
7175
});
7276
// should not get here
@@ -101,13 +105,13 @@ public void testRetryTimeoutEnforcedTransientTransactionError() {
101105
final String errorMessage = "transient transaction error";
102106

103107
try (ClientSession session = client.startSession()) {
104-
ClientSessionClock.INSTANCE.setTime(START_TIME_MS);
105-
session.withTransaction((TransactionBody<Void>) () -> {
106-
ClientSessionClock.INSTANCE.setTime(ERROR_GENERATING_INTERVAL);
107-
MongoException e = new MongoException(112, errorMessage);
108-
e.addLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL);
109-
throw e;
110-
});
108+
doWithSystemNanoTimeHandle(systemNanoTimeHandle ->
109+
session.withTransaction(() -> {
110+
systemNanoTimeHandle.setRelativeToStart(ERROR_GENERATING_INTERVAL);
111+
MongoException e = new MongoException(112, errorMessage);
112+
e.addLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL);
113+
throw e;
114+
}));
111115
fail("Test should have thrown an exception.");
112116
} catch (Exception e) {
113117
assertEquals(errorMessage, e.getMessage());
@@ -127,12 +131,12 @@ public void testRetryTimeoutEnforcedUnknownTransactionCommit() {
127131
+ "'data': {'failCommands': ['commitTransaction'], 'errorCode': 91, 'closeConnection': false}}"));
128132

129133
try (ClientSession session = client.startSession()) {
130-
ClientSessionClock.INSTANCE.setTime(START_TIME_MS);
131-
session.withTransaction((TransactionBody<Void>) () -> {
132-
ClientSessionClock.INSTANCE.setTime(ERROR_GENERATING_INTERVAL);
133-
collection.insertOne(session, new Document("_id", 2));
134-
return null;
135-
});
134+
doWithSystemNanoTimeHandle(systemNanoTimeHandle ->
135+
session.withTransaction(() -> {
136+
systemNanoTimeHandle.setRelativeToStart(ERROR_GENERATING_INTERVAL);
137+
collection.insertOne(session, new Document("_id", 2));
138+
return null;
139+
}));
136140
fail("Test should have thrown an exception.");
137141
} catch (Exception e) {
138142
assertEquals(91, ((MongoException) e).getCode());
@@ -156,12 +160,12 @@ public void testRetryTimeoutEnforcedTransientTransactionErrorOnCommit() {
156160
+ "'errmsg': 'Transaction 0 has been aborted', 'closeConnection': false}}"));
157161

158162
try (ClientSession session = client.startSession()) {
159-
ClientSessionClock.INSTANCE.setTime(START_TIME_MS);
160-
session.withTransaction((TransactionBody<Void>) () -> {
161-
ClientSessionClock.INSTANCE.setTime(ERROR_GENERATING_INTERVAL);
162-
collection.insertOne(session, Document.parse("{ _id : 1 }"));
163-
return null;
164-
});
163+
doWithSystemNanoTimeHandle(systemNanoTimeHandle ->
164+
session.withTransaction(() -> {
165+
systemNanoTimeHandle.setRelativeToStart(ERROR_GENERATING_INTERVAL);
166+
collection.insertOne(session, Document.parse("{ _id : 1 }"));
167+
return null;
168+
}));
165169
fail("Test should have thrown an exception.");
166170
} catch (Exception e) {
167171
assertEquals(251, ((MongoException) e).getCode());
@@ -224,9 +228,9 @@ public void testRetryBackoffIsEnforced() throws InterruptedException {
224228
long noBackoffTime;
225229
try (ClientSession session = client.startSession();
226230
FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) {
227-
long startNanos = System.nanoTime();
231+
StartTime startTime = StartTime.now();
228232
session.withTransaction(() -> collection.insertOne(session, Document.parse("{}")));
229-
noBackoffTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
233+
noBackoffTime = startTime.elapsed().toMillis();
230234
} finally {
231235
// Clear the test jitter supplier to avoid affecting other tests
232236
ExponentialBackoff.clearTestJitterSupplier();
@@ -241,9 +245,9 @@ public void testRetryBackoffIsEnforced() throws InterruptedException {
241245
long withBackoffTime;
242246
try (ClientSession session = client.startSession();
243247
FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) {
244-
long startNanos = System.nanoTime();
248+
StartTime startTime = StartTime.now();
245249
session.withTransaction(() -> collection.insertOne(session, Document.parse("{}")));
246-
withBackoffTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
250+
withBackoffTime = startTime.elapsed().toMillis();
247251
} finally {
248252
ExponentialBackoff.clearTestJitterSupplier();
249253
}
@@ -278,7 +282,18 @@ public void testExponentialBackoffOnTransientError() throws InterruptedException
278282
}
279283
}
280284

281-
private boolean canRunTests() {
285+
private static boolean canRunTests() {
282286
return isSharded() || isDiscoverableReplicaSet();
283287
}
288+
289+
private static void doWithSystemNanoTimeHandle(final Consumer<SystemNanoTimeHandle> action) {
290+
long startNanos = SystemNanoTime.get();
291+
try (MockedStatic<SystemNanoTime> mockedStaticSystem = Mockito.mockStatic(SystemNanoTime.class)) {
292+
action.accept(change -> mockedStaticSystem.when(SystemNanoTime::get).thenReturn(startNanos + change.toNanos()));
293+
}
294+
}
295+
296+
private interface SystemNanoTimeHandle {
297+
void setRelativeToStart(Duration change);
298+
}
284299
}

0 commit comments

Comments
 (0)