Skip to content

Commit 9a315a0

Browse files
authored
Release connection between empty getMore responses (#1928)
- Extract getMoreLoop to loop on empty batches while releasing the connection between each getMore, preventing pool starvation from idle tailable cursors (e.g., change streams) - Add test to verify connection is released and re-acquired between consecutive empty getMores JAVA-6142
1 parent 0be7f11 commit 9a315a0

3 files changed

Lines changed: 83 additions & 52 deletions

File tree

driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void next(final SingleResultCallback<List<T>> callback) {
121121
commandCursorResult = withEmptyResults(commandCursorResult);
122122
funcCallback.onResult(batchResults, null);
123123
} else {
124-
getMore(localServerCursor, funcCallback);
124+
getMoreLoop(localServerCursor, funcCallback);
125125
}
126126
}, callback);
127127
}
@@ -183,10 +183,10 @@ void checkTimeoutModeAndResetTimeoutContextIfIteration() {
183183

184184
private void getMore(final ServerCursor cursor, final SingleResultCallback<List<T>> callback) {
185185
resourceManager.executeWithConnection((connection, wrappedCallback) ->
186-
getMoreLoop(assertNotNull(connection), cursor, wrappedCallback), callback);
186+
executeGetMoreCommand(assertNotNull(connection), cursor, wrappedCallback), callback);
187187
}
188188

189-
private void getMoreLoop(final AsyncConnection connection, final ServerCursor serverCursor,
189+
private void executeGetMoreCommand(final AsyncConnection connection, final ServerCursor serverCursor,
190190
final SingleResultCallback<List<T>> callback) {
191191
connection.commandAsync(namespace.getDatabaseName(),
192192
getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace, batchSize, comment),
@@ -206,19 +206,22 @@ private void getMoreLoop(final AsyncConnection connection, final ServerCursor se
206206
connection.getDescription().getServerAddress(), NEXT_BATCH, assertNotNull(commandResult));
207207
ServerCursor nextServerCursor = commandCursorResult.getServerCursor();
208208
resourceManager.setServerCursor(nextServerCursor);
209-
List<T> nextBatch = commandCursorResult.getResults();
210-
if (nextServerCursor == null || !nextBatch.isEmpty()) {
211-
commandCursorResult = withEmptyResults(commandCursorResult);
212-
callback.onResult(nextBatch, null);
213-
return;
214-
}
215-
216-
if (!resourceManager.operable()) {
217-
callback.onResult(emptyList(), null);
218-
return;
219-
}
209+
callback.onResult(commandCursorResult.getResults(), null);
210+
});
211+
}
220212

221-
getMoreLoop(connection, nextServerCursor, callback);
213+
private void getMoreLoop(final ServerCursor localServerCursor, final SingleResultCallback<List<T>> funcCallback) {
214+
getMore(localServerCursor, (nextBatch, t) -> {
215+
if (t != null) {
216+
funcCallback.onResult(null, t);
217+
} else if (resourceManager.getServerCursor() == null || (nextBatch != null && !nextBatch.isEmpty())) {
218+
commandCursorResult = withEmptyResults(commandCursorResult);
219+
funcCallback.onResult(nextBatch, null);
220+
} else if (!resourceManager.operable()) {
221+
funcCallback.onResult(emptyList(), null);
222+
} else {
223+
getMoreLoop(assertNotNull(resourceManager.getServerCursor()), funcCallback);
224+
}
222225
});
223226
}
224227

driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorTest.java

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,14 @@
4747
import org.junit.jupiter.params.provider.ValueSource;
4848

4949
import java.time.Duration;
50+
import java.util.concurrent.atomic.AtomicInteger;
5051

5152
import static com.mongodb.internal.operation.OperationUnitSpecification.getMaxWireVersionForServerVersion;
5253
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
5354
import static java.util.concurrent.TimeUnit.MILLISECONDS;
55+
import static org.junit.jupiter.api.Assertions.assertEquals;
56+
import static org.junit.jupiter.api.Assertions.assertNotNull;
57+
import static org.junit.jupiter.api.Assertions.assertNull;
5458
import static org.junit.jupiter.api.Assertions.assertTrue;
5559
import static org.mockito.ArgumentMatchers.any;
5660
import static org.mockito.ArgumentMatchers.argThat;
@@ -119,17 +123,17 @@ void shouldSkipKillsCursorsCommandWhenNetworkErrorOccurs() {
119123
return null;
120124
}).when(mockConnection).commandAsync(eq(NAMESPACE.getDatabaseName()), any(), any(), any(), any(), any(), any());
121125
when(serverDescription.getType()).thenReturn(ServerType.LOAD_BALANCER);
122-
AsyncCommandBatchCursor<Document> commandBatchCursor = createBatchCursor(0);
123126

124127
//when
125-
commandBatchCursor.next((result, t) -> {
126-
Assertions.assertNull(result);
127-
Assertions.assertNotNull(t);
128-
Assertions.assertEquals(MongoSocketException.class, t.getClass());
129-
});
128+
try (AsyncCommandBatchCursor<Document> commandBatchCursor = createBatchCursor(0)) {
129+
commandBatchCursor.next((result, t) -> {
130+
Assertions.assertNull(result);
131+
Assertions.assertNotNull(t);
132+
Assertions.assertEquals(MongoSocketException.class, t.getClass());
133+
});
134+
}
130135

131136
//then
132-
commandBatchCursor.close();
133137
verify(mockConnection, times(1)).commandAsync(eq(NAMESPACE.getDatabaseName()), any(), any(), any(), any(), any(), any());
134138
}
135139

@@ -144,17 +148,14 @@ void shouldNotSkipKillsCursorsCommandWhenTimeoutExceptionDoesNotHaveNetworkError
144148
}).when(mockConnection).commandAsync(eq(NAMESPACE.getDatabaseName()), any(), any(), any(), any(), any(), any());
145149
when(serverDescription.getType()).thenReturn(ServerType.LOAD_BALANCER);
146150

147-
AsyncCommandBatchCursor<Document> commandBatchCursor = createBatchCursor(0);
148-
149151
//when
150-
commandBatchCursor.next((result, t) -> {
151-
Assertions.assertNull(result);
152-
Assertions.assertNotNull(t);
153-
Assertions.assertEquals(MongoOperationTimeoutException.class, t.getClass());
154-
});
155-
156-
commandBatchCursor.close();
157-
152+
try (AsyncCommandBatchCursor<Document> commandBatchCursor = createBatchCursor(0)) {
153+
commandBatchCursor.next((result, t) -> {
154+
Assertions.assertNull(result);
155+
Assertions.assertNotNull(t);
156+
Assertions.assertEquals(MongoOperationTimeoutException.class, t.getClass());
157+
});
158+
}
158159

159160
//then
160161
verify(mockConnection, times(2)).commandAsync(any(),
@@ -175,16 +176,14 @@ void shouldSkipKillsCursorsCommandWhenTimeoutExceptionHaveNetworkErrorCause() {
175176
}).when(mockConnection).commandAsync(eq(NAMESPACE.getDatabaseName()), any(), any(), any(), any(), any(), any());
176177
when(serverDescription.getType()).thenReturn(ServerType.LOAD_BALANCER);
177178

178-
AsyncCommandBatchCursor<Document> commandBatchCursor = createBatchCursor(0);
179-
180179
//when
181-
commandBatchCursor.next((result, t) -> {
182-
Assertions.assertNull(result);
183-
Assertions.assertNotNull(t);
184-
Assertions.assertEquals(MongoOperationTimeoutException.class, t.getClass());
185-
});
186-
187-
commandBatchCursor.close();
180+
try (AsyncCommandBatchCursor<Document> commandBatchCursor = createBatchCursor(0)) {
181+
commandBatchCursor.next((result, t) -> {
182+
Assertions.assertNull(result);
183+
Assertions.assertNotNull(t);
184+
Assertions.assertEquals(MongoOperationTimeoutException.class, t.getClass());
185+
});
186+
}
188187

189188
//then
190189
verify(mockConnection, times(1)).commandAsync(any(),
@@ -203,8 +202,6 @@ void closeShouldResetTimeoutContextToDefaultMaxTime() {
203202
try (AsyncCommandBatchCursor<Document> commandBatchCursor = createBatchCursor(maxTimeMS)) {
204203
// verify that the `maxTimeMS` override was applied
205204
timeoutContext.runMaxTimeMS(remainingMillis -> assertTrue(remainingMillis <= maxTimeMS));
206-
} catch (Exception e) {
207-
throw new RuntimeException(e);
208205
}
209206
timeoutContext.runMaxTimeMS(remainingMillis -> {
210207
// verify that the `maxTimeMS` override was reset
@@ -236,8 +233,6 @@ void closeShouldNotResetOriginalTimeout(final boolean disableTimeoutResetWhenClo
236233
Thread.sleep(thirdOfTimeout.toMillis());
237234
return null;
238235
});
239-
} catch (Exception e) {
240-
throw new RuntimeException(e);
241236
}
242237
verify(mockConnection, times(1)).release();
243238
// at this point at least (2 * thirdOfTimeout) have passed
@@ -252,6 +247,34 @@ void closeShouldNotResetOriginalTimeout(final boolean disableTimeoutResetWhenClo
252247
Assertions::fail);
253248
}
254249

250+
@Test
251+
void shouldReleaseConnectionBetweenEmptyGetMoreResponses() {
252+
AtomicInteger callCount = new AtomicInteger();
253+
doAnswer(invocation -> {
254+
SingleResultCallback<BsonDocument> cb = invocation.getArgument(6);
255+
cb.onResult(new BsonDocument("cursor",
256+
new BsonDocument("ns", new BsonString(NAMESPACE.getFullName()))
257+
.append("id", new BsonInt64(callCount.incrementAndGet() < 3 ? 1 : 0))
258+
.append("nextBatch", new BsonArrayWrapper<>(new BsonArray()))), null);
259+
return null;
260+
}).when(mockConnection).commandAsync(eq(NAMESPACE.getDatabaseName()),
261+
argThat(doc -> doc.containsKey("getMore")), any(), any(), any(), any(), any());
262+
263+
when(serverDescription.getType()).thenReturn(ServerType.STANDALONE);
264+
try (AsyncCommandBatchCursor<Document> commandBatchCursor = createBatchCursor(0)) {
265+
commandBatchCursor.next((result, t) -> {
266+
assertNotNull(result);
267+
assertTrue(result.isEmpty());
268+
assertNull(t);
269+
});
270+
}
271+
272+
// 2 empty-batch getMores + 1 exhausted getMore = 3 getMores, but the 3rd
273+
// exhausts the cursor (id=0), which makes the cursor break the loop and return an empty result.
274+
verify(mockConnection, times(3)).release();
275+
verify(connectionSource, times(3)).getConnection(any());
276+
assertEquals(3, callCount.get());
277+
}
255278

256279
private AsyncCommandBatchCursor<Document> createBatchCursor(final long maxTimeMS) {
257280
return new AsyncCommandBatchCursor<Document>(

driver-core/src/test/unit/com/mongodb/internal/operation/AsyncCommandBatchCursorSpecification.groovy

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,9 @@ class AsyncCommandBatchCursorSpecification extends Specification {
162162
def 'should handle getMore when there are empty results but there is a cursor'() {
163163
given:
164164
def initialConnection = referenceCountedAsyncConnection()
165-
def connection = referenceCountedAsyncConnection()
166-
def connectionSource = getAsyncConnectionSource(connection)
165+
def connectionA = referenceCountedAsyncConnection()
166+
def connectionB = referenceCountedAsyncConnection()
167+
def connectionSource = getAsyncConnectionSource(connectionA, connectionB)
167168

168169
when:
169170
def firstBatch = createCommandResult([], CURSOR_ID)
@@ -172,14 +173,15 @@ class AsyncCommandBatchCursorSpecification extends Specification {
172173
def batch = nextBatch(cursor)
173174

174175
then:
175-
1 * connection.commandAsync(*_) >> {
176-
connection.getCount() == 1
176+
1 * connectionA.commandAsync(*_) >> {
177+
connectionA.getCount() == 1
177178
connectionSource.getCount() == 1
178179
it.last().onResult(response, null)
179180
}
180181

181-
1 * connection.commandAsync(*_) >> {
182-
connection.getCount() == 1
182+
then:
183+
1 * connectionB.commandAsync(*_) >> {
184+
connectionB.getCount() == 1
183185
connectionSource.getCount() == 1
184186
it.last().onResult(response2, null)
185187
}
@@ -191,7 +193,10 @@ class AsyncCommandBatchCursorSpecification extends Specification {
191193
cursor.close()
192194

193195
then:
194-
0 * connection._
196+
0 * connectionA._
197+
0 * connectionB._
198+
connectionA.getCount() == 0
199+
connectionB.getCount() == 0
195200
initialConnection.getCount() == 0
196201
connectionSource.getCount() == 0
197202

0 commit comments

Comments
 (0)