Skip to content

Commit 1e48ff8

Browse files
authored
Expose RowDescriptor in Cursor and RowStream (#1577)
* Expose RowDescriptor in Cursor and RowStream Closes #1341 The value is null until the first set of rows is fetched from the database. Signed-off-by: Thomas Segismont <tsegismont@gmail.com> * Disable new tests in ProxySQLPreparedQueryTest Fetch command not supported Signed-off-by: Thomas Segismont <tsegismont@gmail.com> --------- Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
1 parent 051ca23 commit 1e48ff8

6 files changed

Lines changed: 127 additions & 42 deletions

File tree

vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/tck/ProxySQLPreparedQueryTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,18 @@ public void testStreamQueryPauseInBatchFromAnotherThread(TestContext ctx) {
7979
public void testStreamQueryPauseResume(TestContext ctx) {
8080
super.testStreamQueryPauseResume(ctx);
8181
}
82+
83+
@Test
84+
@Ignore("Fetch command not supported by ProxySQL")
85+
@Override
86+
public void testQueryCursorNoResults(TestContext ctx) {
87+
super.testQueryCursorNoResults(ctx);
88+
}
89+
90+
@Test
91+
@Ignore("Fetch command not supported by ProxySQL")
92+
@Override
93+
public void testStreamQueryNoResults(TestContext ctx) {
94+
super.testStreamQueryNoResults(ctx);
95+
}
8296
}

vertx-sql-client/src/main/java/io/vertx/sqlclient/Cursor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,23 @@
1919

2020
import io.vertx.codegen.annotations.VertxGen;
2121
import io.vertx.core.Future;
22+
import io.vertx.sqlclient.desc.RowDescriptor;
2223

2324
/**
2425
* A cursor that reads progressively rows from the database, it is useful for reading very large result sets.
2526
*/
2627
@VertxGen
2728
public interface Cursor {
2829

30+
/**
31+
* Describes rows loaded with {@link #read(int)}.
32+
* <p>
33+
* This returns {@code null} until the first set of rows is fetched from the database.
34+
*/
35+
default RowDescriptor rowDescriptor() {
36+
return null;
37+
}
38+
2939
/**
3040
* Read rows from the cursor, the result is provided asynchronously to the {@code handler}.
3141
*

vertx-sql-client/src/main/java/io/vertx/sqlclient/RowStream.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,23 @@
2222
import io.vertx.core.Future;
2323
import io.vertx.core.Handler;
2424
import io.vertx.core.streams.ReadStream;
25+
import io.vertx.sqlclient.desc.RowDescriptor;
2526

2627
/**
2728
* A row oriented stream.
2829
*/
2930
@VertxGen
3031
public interface RowStream<T> extends ReadStream<T> {
3132

33+
/**
34+
* Describes rows emitted by this stream.
35+
* <p>
36+
* This returns {@code null} until the first set of rows is fetched from the database.
37+
*/
38+
default RowDescriptor rowDescriptor() {
39+
return null;
40+
}
41+
3242
@Fluent
3343
@Override
3444
RowStream<T> exceptionHandler(Handler<Throwable> handler);

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
import io.vertx.sqlclient.Cursor;
2525
import io.vertx.sqlclient.Row;
2626
import io.vertx.sqlclient.RowSet;
27-
import io.vertx.sqlclient.spi.connection.Connection;
27+
import io.vertx.sqlclient.desc.RowDescriptor;
2828
import io.vertx.sqlclient.internal.TupleBase;
29+
import io.vertx.sqlclient.spi.connection.Connection;
2930

3031
import java.util.UUID;
3132

@@ -42,6 +43,7 @@ public class CursorImpl implements Cursor {
4243

4344
private String id;
4445
private boolean closed;
46+
private RowDescriptor rowDescriptor;
4547
QueryResultBuilder<RowSet<Row>, RowSetImpl<Row>, RowSet<Row>> result;
4648

4749
CursorImpl(PreparedStatementBase ps, Connection conn, ContextInternal context, boolean autoCommit, TupleBase params) {
@@ -60,6 +62,11 @@ public synchronized boolean hasMore() {
6062
return result.isSuspended();
6163
}
6264

65+
@Override
66+
public synchronized RowDescriptor rowDescriptor() {
67+
return rowDescriptor;
68+
}
69+
6370
@Override
6471
public synchronized Future<RowSet<Row>> read(int count) {
6572
PromiseInternal<RowSet<Row>> promise = context.promise();
@@ -71,7 +78,13 @@ public synchronized Future<RowSet<Row>> read(int count) {
7178
suspended = true;
7279
}
7380
ps.readCursor(this, id, suspended, params, count, promise);
74-
return promise.future();
81+
return promise.future().andThen((rowSet, failure) -> {
82+
if (failure == null) {
83+
synchronized (this) {
84+
rowDescriptor = rowSet.rowDescriptor();
85+
}
86+
}
87+
});
7588
}
7689

7790
@Override

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowStreamImpl.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@
1717

1818
package io.vertx.sqlclient.impl;
1919

20-
import io.vertx.core.Future;
21-
import io.vertx.core.internal.ContextInternal;
22-
import io.vertx.sqlclient.Cursor;
23-
import io.vertx.sqlclient.RowIterator;
24-
import io.vertx.sqlclient.RowSet;
25-
import io.vertx.sqlclient.RowStream;
26-
import io.vertx.sqlclient.Row;
27-
import io.vertx.sqlclient.Tuple;
2820
import io.vertx.core.AsyncResult;
21+
import io.vertx.core.Future;
2922
import io.vertx.core.Handler;
23+
import io.vertx.core.internal.ContextInternal;
24+
import io.vertx.sqlclient.*;
25+
import io.vertx.sqlclient.desc.RowDescriptor;
3026

3127
import java.util.Iterator;
3228

@@ -58,6 +54,11 @@ public synchronized Cursor cursor() {
5854
return cursor;
5955
}
6056

57+
@Override
58+
public synchronized RowDescriptor rowDescriptor() {
59+
return cursor != null ? cursor.rowDescriptor() : null;
60+
}
61+
6162
@Override
6263
public synchronized RowStream<Row> exceptionHandler(Handler<Throwable> handler) {
6364
exceptionHandler = handler;
@@ -202,8 +203,12 @@ private void checkPending() {
202203
break;
203204
} else {
204205
cursor.close();
205-
cursor = null;
206-
handler = endHandler;
206+
handler = v -> {
207+
endHandler.handle(null);
208+
synchronized (RowStreamImpl.this) {
209+
cursor = null;
210+
}
211+
};
207212
event = null;
208213
}
209214
}

vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/PreparedQueryTestBase.java

Lines changed: 63 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,6 @@ public abstract class PreparedQueryTestBase {
3838

3939
protected Consumer<Throwable> msgVerifier;
4040

41-
private static void insertIntoTestTable(TestContext ctx, SqlClient client, int amount, Runnable completionHandler) {
42-
AtomicInteger count = new AtomicInteger();
43-
for (int i = 0; i < 10; i++) {
44-
client
45-
.query("INSERT INTO mutable (id, val) VALUES (" + i + ", 'Whatever-" + i + "')")
46-
.execute()
47-
.onComplete(ctx.asyncAssertSuccess(r1 -> {
48-
ctx.assertEquals(1, r1.rowCount());
49-
if (count.incrementAndGet() == amount) {
50-
completionHandler.run();
51-
}
52-
}));
53-
}
54-
}
55-
5641
protected void connect(Handler<AsyncResult<SqlConnection>> handler) {
5742
connector.connect(handler);
5843
}
@@ -262,23 +247,51 @@ public void testQueryCursor(TestContext ctx) {
262247
conn
263248
.prepare(statement("SELECT * FROM immutable WHERE id=", " OR id=", " OR id=", " OR id=", " OR id=", " OR id=", ""))
264249
.onComplete(ctx.asyncAssertSuccess(ps -> {
265-
Cursor query = ps.cursor(Tuple.of(1, 8, 4, 11, 2, 9));
266-
query
267-
.read(4)
268-
.onComplete(ctx.asyncAssertSuccess(result -> {
269-
ctx.assertNotNull(result.columnsNames());
270-
ctx.assertEquals(4, result.size());
271-
ctx.assertTrue(query.hasMore());
272-
query
250+
Cursor cursor = ps.cursor(Tuple.of(1, 8, 4, 11, 2, 9));
251+
ctx.assertNull(cursor.rowDescriptor());
252+
cursor
273253
.read(4)
274-
.onComplete(ctx.asyncAssertSuccess(result2 -> {
275-
ctx.assertNotNull(result2.columnsNames());
276-
ctx.assertEquals(2, result2.size());
277-
ctx.assertFalse(query.hasMore());
278-
async.complete();
279-
}));
254+
.onComplete(ctx.asyncAssertSuccess(result -> {
255+
ctx.assertNotNull(result.columnsNames());
256+
ctx.assertNotNull(cursor.rowDescriptor());
257+
ctx.assertEquals(result.columnsNames(), cursor.rowDescriptor().columnNames());
258+
ctx.assertEquals(4, result.size());
259+
ctx.assertTrue(cursor.hasMore());
260+
cursor
261+
.read(4)
262+
.onComplete(ctx.asyncAssertSuccess(result2 -> {
263+
ctx.assertNotNull(result2.columnsNames());
264+
ctx.assertNotNull(cursor.rowDescriptor());
265+
ctx.assertEquals(result2.columnsNames(), cursor.rowDescriptor().columnNames());
266+
ctx.assertEquals(2, result2.size());
267+
ctx.assertFalse(cursor.hasMore());
268+
async.complete();
269+
}));
270+
}));
271+
}));
272+
});
273+
}
274+
275+
@Test
276+
public void testQueryCursorNoResults(TestContext ctx) {
277+
Async async = ctx.async();
278+
testCursor(ctx, conn -> {
279+
conn
280+
.prepare("SELECT * FROM immutable WHERE 1=3")
281+
.onComplete(ctx.asyncAssertSuccess(ps -> {
282+
Cursor cursor = ps.cursor(Tuple.tuple());
283+
ctx.assertNull(cursor.rowDescriptor());
284+
cursor
285+
.read(99)
286+
.onComplete(ctx.asyncAssertSuccess(result -> {
287+
ctx.assertNotNull(result.columnsNames());
288+
ctx.assertNotNull(cursor.rowDescriptor());
289+
ctx.assertEquals(result.columnsNames(), cursor.rowDescriptor().columnNames());
290+
ctx.assertEquals(0, result.size());
291+
ctx.assertFalse(cursor.hasMore());
292+
async.complete();
293+
}));
280294
}));
281-
}));
282295
});
283296
}
284297

@@ -337,10 +350,12 @@ public void testStreamQuery(TestContext ctx) {
337350
testCursor(ctx, conn -> {
338351
conn.prepare("SELECT * FROM immutable").onComplete(ctx.asyncAssertSuccess(ps -> {
339352
RowStream<Row> stream = ps.createStream(4, Tuple.tuple());
353+
ctx.assertNull(stream.rowDescriptor());
340354
List<Tuple> rows = new ArrayList<>();
341355
AtomicInteger ended = new AtomicInteger();
342356
stream.handler(tuple -> {
343357
ctx.assertEquals(0, ended.get());
358+
ctx.assertNotNull(stream.rowDescriptor());
344359
rows.add(tuple);
345360
});
346361
Cursor cursor = ((RowStreamInternal) stream).cursor();
@@ -354,6 +369,24 @@ public void testStreamQuery(TestContext ctx) {
354369
});
355370
}
356371

372+
@Test
373+
public void testStreamQueryNoResults(TestContext ctx) {
374+
Async async = ctx.async();
375+
testCursor(ctx, conn -> {
376+
conn.prepare("SELECT * FROM immutable WHERE 1=3").onComplete(ctx.asyncAssertSuccess(ps -> {
377+
RowStream<Row> stream = ps.createStream(99, Tuple.tuple());
378+
ctx.assertNull(stream.rowDescriptor());
379+
stream.endHandler(v -> {
380+
ctx.assertNotNull(stream.rowDescriptor());
381+
async.complete();
382+
});
383+
stream.handler(tuple -> {
384+
ctx.fail("Should not get here");
385+
});
386+
}));
387+
});
388+
}
389+
357390
@Test
358391
public void testStreamQueryPauseInBatch(TestContext ctx) {
359392
testStreamQueryPauseInBatch(ctx, Runnable::run);

0 commit comments

Comments
 (0)