Skip to content

Commit 8731b19

Browse files
authored
Release connection between empty getMore responses in AsyncCommandCursor (#1925)
- Extract getMoreLoop to loop on empty batches while releasing the connection between each getMore, preventing pool starvation from idle tailable cursors (e.g., change streams) - Add test to verify connection is released and re-acquired between consecutive empty getMores JAVA-6142
1 parent 718c2c1 commit 8731b19

File tree

3 files changed

+79
-38
lines changed

3 files changed

+79
-38
lines changed

driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,12 @@ public void next(final OperationContext operationContext, final SingleResultCall
108108
commandCursorResult = withEmptyResults(commandCursorResult);
109109
funcCallback.onResult(batchResults, null);
110110
} else {
111-
getMore(localServerCursor, operationContext, funcCallback);
111+
getMoreLoop(localServerCursor, operationContext, funcCallback);
112112
}
113113
}, operationContext, callback);
114114
}
115115

116+
116117
@Override
117118
public boolean isClosed() {
118119
return !resourceManager.operable();
@@ -162,14 +163,32 @@ public int getMaxWireVersion() {
162163
return maxWireVersion;
163164
}
164165

166+
private void getMoreLoop(final ServerCursor localServerCursor,
167+
final OperationContext operationContext,
168+
final SingleResultCallback<List<T>> funcCallback) {
169+
getMore(localServerCursor, operationContext, (nextBatch, t) -> {
170+
if (t != null) {
171+
funcCallback.onResult(null, t);
172+
} else if (resourceManager.getServerCursor() == null || (nextBatch != null && !nextBatch.isEmpty())) {
173+
commandCursorResult = withEmptyResults(commandCursorResult);
174+
funcCallback.onResult(nextBatch, null);
175+
} else if (!resourceManager.operable()) {
176+
funcCallback.onResult(emptyList(), null);
177+
} else {
178+
getMoreLoop(assertNotNull(resourceManager.getServerCursor()), operationContext, funcCallback);
179+
}
180+
});
181+
}
182+
165183
private void getMore(final ServerCursor cursor, final OperationContext operationContext, final SingleResultCallback<List<T>> callback) {
166184
resourceManager.executeWithConnection(operationContext, (connection, wrappedCallback) ->
167-
getMoreLoop(assertNotNull(connection), cursor, operationContext, wrappedCallback), callback);
185+
executeGetMoreCommand(assertNotNull(connection), cursor, operationContext, wrappedCallback), callback);
168186
}
169187

170-
private void getMoreLoop(final AsyncConnection connection, final ServerCursor serverCursor,
171-
final OperationContext operationContext,
172-
final SingleResultCallback<List<T>> callback) {
188+
private void executeGetMoreCommand(final AsyncConnection connection,
189+
final ServerCursor serverCursor,
190+
final OperationContext operationContext,
191+
final SingleResultCallback<List<T>> callback) {
173192
connection.commandAsync(namespace.getDatabaseName(),
174193
getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace, batchSize, comment),
175194
NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(),
@@ -188,19 +207,7 @@ private void getMoreLoop(final AsyncConnection connection, final ServerCursor se
188207
connection.getDescription().getServerAddress(), NEXT_BATCH, assertNotNull(commandResult));
189208
ServerCursor nextServerCursor = commandCursorResult.getServerCursor();
190209
resourceManager.setServerCursor(nextServerCursor);
191-
List<T> nextBatch = commandCursorResult.getResults();
192-
if (nextServerCursor == null || !nextBatch.isEmpty()) {
193-
commandCursorResult = withEmptyResults(commandCursorResult);
194-
callback.onResult(nextBatch, null);
195-
return;
196-
}
197-
198-
if (!resourceManager.operable()) {
199-
callback.onResult(emptyList(), null);
200-
return;
201-
}
202-
203-
getMoreLoop(connection, nextServerCursor, operationContext, callback);
210+
callback.onResult(commandCursorResult.getResults(), null);
204211
});
205212
}
206213

driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,18 @@
4141
import org.bson.Document;
4242
import org.bson.codecs.Decoder;
4343
import org.bson.codecs.DocumentCodec;
44-
import org.junit.jupiter.api.Assertions;
4544
import org.junit.jupiter.api.BeforeEach;
4645
import org.junit.jupiter.api.Test;
4746

4847
import java.time.Duration;
48+
import java.util.concurrent.atomic.AtomicInteger;
4949

5050
import static com.mongodb.internal.operation.OperationUnitSpecification.getMaxWireVersionForServerVersion;
5151
import static java.util.concurrent.TimeUnit.MILLISECONDS;
52+
import static org.junit.jupiter.api.Assertions.assertEquals;
53+
import static org.junit.jupiter.api.Assertions.assertNotNull;
54+
import static org.junit.jupiter.api.Assertions.assertNull;
55+
import static org.junit.jupiter.api.Assertions.assertTrue;
5256
import static org.mockito.ArgumentMatchers.any;
5357
import static org.mockito.ArgumentMatchers.argThat;
5458
import static org.mockito.ArgumentMatchers.eq;
@@ -80,11 +84,9 @@ class AsyncCommandCursorTest {
8084
private OperationContext operationContext;
8185
private TimeoutContext timeoutContext;
8286
private ServerDescription serverDescription;
83-
private AsyncCursor<Document> coreCursor;
8487

8588
@BeforeEach
8689
void setUp() {
87-
coreCursor = mock(AsyncCursor.class);
8890
timeoutContext = spy(new TimeoutContext(TimeoutSettings.create(
8991
MongoClientSettings.builder().timeout(TIMEOUT.toMillis(), MILLISECONDS).build())));
9092
operationContext = spy(new OperationContext(
@@ -105,7 +107,7 @@ void setUp() {
105107
serverDescription = mock(ServerDescription.class);
106108
when(operationContext.getTimeoutContext()).thenReturn(timeoutContext);
107109
doAnswer(invocation -> {
108-
SingleResultCallback<AsyncConnection> callback = invocation.getArgument(0);
110+
SingleResultCallback<AsyncConnection> callback = invocation.getArgument(1);
109111
callback.onResult(mockConnection, null);
110112
return null;
111113
}).when(connectionSource).getConnection(any(), any());
@@ -126,9 +128,9 @@ void shouldSkipKillsCursorsCommandWhenNetworkErrorOccurs() {
126128

127129
//when
128130
commandBatchCursor.next(operationContext, (result, t) -> {
129-
Assertions.assertNull(result);
130-
Assertions.assertNotNull(t);
131-
Assertions.assertEquals(MongoSocketException.class, t.getClass());
131+
assertNull(result);
132+
assertNotNull(t);
133+
assertEquals(MongoSocketException.class, t.getClass());
132134
});
133135

134136
//then
@@ -151,9 +153,9 @@ void shouldNotSkipKillsCursorsCommandWhenTimeoutExceptionDoesNotHaveNetworkError
151153

152154
//when
153155
commandBatchCursor.next(operationContext, (result, t) -> {
154-
Assertions.assertNull(result);
155-
Assertions.assertNotNull(t);
156-
Assertions.assertEquals(MongoOperationTimeoutException.class, t.getClass());
156+
assertNull(result);
157+
assertNotNull(t);
158+
assertEquals(MongoOperationTimeoutException.class, t.getClass());
157159
});
158160

159161
commandBatchCursor.close(operationContext);
@@ -182,9 +184,9 @@ void shouldSkipKillsCursorsCommandWhenTimeoutExceptionHaveNetworkErrorCause() {
182184

183185
//when
184186
commandBatchCursor.next(operationContext, (result, t) -> {
185-
Assertions.assertNull(result);
186-
Assertions.assertNotNull(t);
187-
Assertions.assertEquals(MongoOperationTimeoutException.class, t.getClass());
187+
assertNull(result);
188+
assertNotNull(t);
189+
assertEquals(MongoOperationTimeoutException.class, t.getClass());
188190
});
189191

190192
commandBatchCursor.close(operationContext);
@@ -199,6 +201,33 @@ void shouldSkipKillsCursorsCommandWhenTimeoutExceptionHaveNetworkErrorCause() {
199201
}
200202

201203

204+
@Test
205+
void shouldReleaseConnectionBetweenEmptyGetMoreResponses() {
206+
AtomicInteger callCount = new AtomicInteger();
207+
doAnswer(invocation -> {
208+
SingleResultCallback<BsonDocument> cb = invocation.getArgument(6);
209+
cb.onResult(new BsonDocument("cursor",
210+
new BsonDocument("ns", new BsonString(NAMESPACE.getFullName()))
211+
.append("id", new BsonInt64(callCount.incrementAndGet() < 3 ? 1 : 0))
212+
.append("nextBatch", new BsonArrayWrapper<>(new BsonArray()))), null);
213+
return null;
214+
}).when(mockConnection).commandAsync(eq(NAMESPACE.getDatabaseName()),
215+
argThat(doc -> doc.containsKey("getMore")), any(), any(), any(), any(), any());
216+
217+
when(serverDescription.getType()).thenReturn(ServerType.STANDALONE);
218+
createBatchCursor().next(operationContext, (result, t) -> {
219+
assertNotNull(result);
220+
assertTrue(result.isEmpty());
221+
assertNull(t);
222+
});
223+
224+
// 2 empty-batch getMores + 1 exhausted getMore = 3 getMores, but the 3rd
225+
// exhausts the cursor (id=0), which makes the cursor break the loop and return an empty result.
226+
verify(mockConnection, times(3)).release();
227+
verify(connectionSource, times(3)).getConnection(any(), any());
228+
assertEquals(3, callCount.get());
229+
}
230+
202231
private AsyncCursor<Document> createBatchCursor() {
203232
return new AsyncCommandCursor<>(
204233
COMMAND_CURSOR_DOCUMENT,

driver-core/src/test/unit/com/mongodb/internal/operation/AsyncCommandBatchCursorSpecification.groovy

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,9 @@ class AsyncCommandBatchCursorSpecification extends Specification {
167167
def 'should handle getMore when there are empty results but there is a cursor'() {
168168
given:
169169
def initialConnection = referenceCountedAsyncConnection()
170-
def connection = referenceCountedAsyncConnection()
171-
def connectionSource = getAsyncConnectionSource(connection)
170+
def connectionA = referenceCountedAsyncConnection()
171+
def connectionB = referenceCountedAsyncConnection()
172+
def connectionSource = getAsyncConnectionSource(connectionA, connectionB)
172173

173174
when:
174175
def firstBatch = createCommandResult([], CURSOR_ID)
@@ -177,14 +178,15 @@ class AsyncCommandBatchCursorSpecification extends Specification {
177178
def batch = nextBatch(cursor)
178179

179180
then:
180-
1 * connection.commandAsync(*_) >> {
181-
connection.getCount() == 1
181+
1 * connectionA.commandAsync(*_) >> {
182+
connectionA.getCount() == 1
182183
connectionSource.getCount() == 1
183184
it.last().onResult(response, null)
184185
}
185186

186-
1 * connection.commandAsync(*_) >> {
187-
connection.getCount() == 1
187+
then:
188+
1 * connectionB.commandAsync(*_) >> {
189+
connectionB.getCount() == 1
188190
connectionSource.getCount() == 1
189191
it.last().onResult(response2, null)
190192
}
@@ -196,7 +198,10 @@ class AsyncCommandBatchCursorSpecification extends Specification {
196198
cursor.close()
197199

198200
then:
199-
0 * connection._
201+
0 * connectionA._
202+
0 * connectionB._
203+
connectionA.getCount() == 0
204+
connectionB.getCount() == 0
200205
initialConnection.getCount() == 0
201206
connectionSource.getCount() == 0
202207

0 commit comments

Comments
 (0)