What happens?
Undefined, undesirable behavior running query over arrow stream that requires multiple passes.
Perhaps this issue needs to be in the duckdb-java or duckdb-python repos, but I think the behavior is likely stemming from a problem in duckdb core.
If, after first registering an arrow stream with duckdb, you run a query that requires mutiple table scans, the query will fail (java) or produce incorrect results (python).
In duckdb-java, it appears as though the second scan of the results fails with an error indicating that the stream has been released: Invalid Input Error: This stream has been released
In duckdb-python, the query completes successfully, but returns seemingly incorrect data.
Ideally, the query would complete, successfully, with the correct result, having materialized the intermediate result as necessary (and apply any predicate / filter pushdown if posible).
To Reproduce
Java repro that throws an error
package com.acme;
import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.duckdb.DuckDBConnection;
import org.duckdb.DuckDBDriver;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;
public class DuckDBStreamIngestTest {
private static byte[] createStream(BufferAllocator allocator) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
IntVector intVector = new IntVector("id", allocator);
VarCharVector stringVector = new VarCharVector("value", allocator);
try (
VectorSchemaRoot vsr = new VectorSchemaRoot(List.of(intVector, stringVector));
ArrowStreamWriter writer = new ArrowStreamWriter(vsr, null, outputStream)
) {
vsr.setRowCount(5);
for (int i = 0; i < 5; i++) {
intVector.setSafe(i, i);
stringVector.setSafe(i, ("v " + Integer.valueOf(i).toString()).getBytes(StandardCharsets.UTF_8));
}
writer.writeBatch();
}
return outputStream.toByteArray();
}
public static void main(final String[] args) throws Exception {
BufferAllocator allocator = new RootAllocator();
byte[] bytes = createStream(allocator);
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
ArrowStreamReader arrowReader = new ArrowStreamReader(inputStream, allocator);
ArrowArrayStream arrowArrayStream = ArrowArrayStream.allocateNew(allocator);
Data.exportArrayStream(allocator, arrowReader, arrowArrayStream);
DuckDBDriver driver = new DuckDBDriver();
try (Connection connection = driver.connect("jdbc:duckdb:", new Properties())) {
DuckDBConnection conn = connection.unwrap(DuckDBConnection.class);
conn.registerArrowStream("arrow_table", arrowArrayStream);
try (
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select id from arrow_table union all select id + 1 from arrow_table");
) {
printResultSet(resultSet);
}
}
}
private static void printResultSet(ResultSet resultSet) throws SQLException {
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
System.out.print(resultSet.getMetaData().getColumnLabel(i) + ", ");
}
System.out.println();
while (resultSet.next()) {
resultSet.getMetaData().getColumnCount();
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
System.out.print(resultSet.getString(i) + ", ");
}
System.out.println();
}
}
}
Java output:
Exception in thread "main" java.sql.SQLException: Invalid Input Error: This stream has been released
at org.duckdb.DuckDBNative.duckdb_jdbc_execute(Native Method)
at org.duckdb.DuckDBPreparedStatement.execute(DuckDBPreparedStatement.java:193)
at org.duckdb.DuckDBPreparedStatement.execute(DuckDBPreparedStatement.java:159)
at org.duckdb.DuckDBPreparedStatement.executeQuery(DuckDBPreparedStatement.java:229)
at org.duckdb.DuckDBPreparedStatement.executeQuery(DuckDBPreparedStatement.java:263)
at com.acme.DuckDBStreamIngestTest.main(DuckDBStreamIngestTest.java:63)
Python repro
import pyarrow as pa
import duckdb
import io
def create_arrow_stream(table):
buffer = io.BytesIO()
with pa.ipc.new_stream(buffer, table.schema) as writer:
writer.write(table)
buffer.seek(0)
return buffer
def main():
data = {
'id': [1, 2, 3, 4, 5],
'value': ['one', 'two', 'three', 'four', 'five']
}
table = pa.table(data)
stream_buffer1 = create_arrow_stream(table)
with pa.ipc.open_stream(stream_buffer1) as stream1:
duckdb.register("arrow_stream", stream1)
sql = "SELECT id FROM arrow_stream union all select id + 1 from arrow_stream"
print("Query Results:")
print(duckdb.sql(sql).show())
if __name__ == "__main__":
main()
Python output:
Query Results:
┌───────┐
│ id │
│ int64 │
├───────┤
│ 1 │
│ 2 │
│ 3 │
│ 4 │
│ 5 │
└───────┘
In both cases, you can materialize the table (issuing a CREATE TABLE <tablename> AS SELECT * FROM arrow_stream) and the query completes and produces the appropriate result.
id,
0,
1,
2,
3,
4,
1,
2,
3,
4,
5,
OS:
linux x86_64, macOS arch64
DuckDB Version:
1.3.2
DuckDB Client:
duckdb-java,python
Hardware:
No response
Full Name:
Jonathan Swenson
Affiliation:
Omni
What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.
I have tested with a stable release
Did you include all relevant data sets for reproducing the issue?
Yes
Did you include all code required to reproduce the issue?
Did you include all relevant configuration (e.g., CPU architecture, Python version, Linux distribution) to reproduce the issue?
What happens?
Undefined, undesirable behavior running query over arrow stream that requires multiple passes.
Perhaps this issue needs to be in the duckdb-java or duckdb-python repos, but I think the behavior is likely stemming from a problem in duckdb core.
If, after first registering an arrow stream with duckdb, you run a query that requires mutiple table scans, the query will fail (java) or produce incorrect results (python).
In duckdb-java, it appears as though the second scan of the results fails with an error indicating that the stream has been released:
Invalid Input Error: This stream has been releasedIn duckdb-python, the query completes successfully, but returns seemingly incorrect data.
Ideally, the query would complete, successfully, with the correct result, having materialized the intermediate result as necessary (and apply any predicate / filter pushdown if posible).
To Reproduce
Java repro that throws an error
Java output:
Python repro
Python output:
In both cases, you can materialize the table (issuing a
CREATE TABLE <tablename> AS SELECT * FROM arrow_stream) and the query completes and produces the appropriate result.OS:
linux x86_64, macOS arch64
DuckDB Version:
1.3.2
DuckDB Client:
duckdb-java,python
Hardware:
No response
Full Name:
Jonathan Swenson
Affiliation:
Omni
What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.
I have tested with a stable release
Did you include all relevant data sets for reproducing the issue?
Yes
Did you include all code required to reproduce the issue?
Did you include all relevant configuration (e.g., CPU architecture, Python version, Linux distribution) to reproduce the issue?