@@ -66,6 +66,45 @@ unique_ptr<ArrowArrayStreamWrapper> PythonTableArrowArrayStreamFactory::Produce(
6666 py::handle arrow_obj_handle (factory->arrow_object );
6767 auto arrow_object_type = DuckDBPyConnection::GetArrowType (arrow_obj_handle);
6868
69+ if (arrow_object_type == PyArrowObjectType::PyCapsuleInterface) {
70+ py::object capsule_obj = arrow_obj_handle.attr (" __arrow_c_stream__" )();
71+ auto capsule = py::reinterpret_borrow<py::capsule>(capsule_obj);
72+ auto stream = capsule.get_pointer <struct ArrowArrayStream >();
73+ if (!stream->release ) {
74+ throw InvalidInputException (
75+ " The __arrow_c_stream__() method returned a released stream. "
76+ " If this object is single-use, implement __arrow_c_schema__() or expose a .schema attribute "
77+ " with _export_to_c() so that DuckDB can extract the schema without consuming the stream." );
78+ }
79+
80+ auto &import_cache_check = *DuckDBPyConnection::ImportCache ();
81+ if (import_cache_check.pyarrow .dataset ()) {
82+ // Tier A: full pushdown via pyarrow.dataset
83+ // Import as RecordBatchReader, feed through Scanner.from_batches for projection/filter pushdown.
84+ auto pyarrow_lib_module = py::module::import (" pyarrow" ).attr (" lib" );
85+ auto import_func = pyarrow_lib_module.attr (" RecordBatchReader" ).attr (" _import_from_c" );
86+ py::object reader = import_func (reinterpret_cast <uint64_t >(stream));
87+ // _import_from_c takes ownership of the stream; null out to prevent capsule double-free
88+ stream->release = nullptr ;
89+ auto &import_cache = *DuckDBPyConnection::ImportCache ();
90+ py::object arrow_batch_scanner = import_cache.pyarrow .dataset .Scanner ().attr (" from_batches" );
91+ py::handle reader_handle = reader;
92+ auto scanner = ProduceScanner (arrow_batch_scanner, reader_handle, parameters, factory->client_properties );
93+ auto record_batches = scanner.attr (" to_reader" )();
94+ auto res = make_uniq<ArrowArrayStreamWrapper>();
95+ auto export_to_c = record_batches.attr (" _export_to_c" );
96+ export_to_c (reinterpret_cast <uint64_t >(&res->arrow_array_stream ));
97+ return res;
98+ } else {
99+ // Tier B: no pyarrow.dataset, return raw stream (no pushdown)
100+ // DuckDB applies projection/filter post-scan via arrow_scan_dumb
101+ auto res = make_uniq<ArrowArrayStreamWrapper>();
102+ res->arrow_array_stream = *stream;
103+ stream->release = nullptr ;
104+ return res;
105+ }
106+ }
107+
69108 if (arrow_object_type == PyArrowObjectType::PyCapsule) {
70109 auto res = make_uniq<ArrowArrayStreamWrapper>();
71110 auto capsule = py::reinterpret_borrow<py::capsule>(arrow_obj_handle);
@@ -78,21 +117,12 @@ unique_ptr<ArrowArrayStreamWrapper> PythonTableArrowArrayStreamFactory::Produce(
78117 return res;
79118 }
80119
120+ // Scanner and Dataset: require pyarrow.dataset for pushdown
121+ VerifyArrowDatasetLoaded ();
81122 auto &import_cache = *DuckDBPyConnection::ImportCache ();
82123 py::object scanner;
83124 py::object arrow_batch_scanner = import_cache.pyarrow .dataset .Scanner ().attr (" from_batches" );
84125 switch (arrow_object_type) {
85- case PyArrowObjectType::Table: {
86- auto arrow_dataset = import_cache.pyarrow .dataset ().attr (" dataset" );
87- auto dataset = arrow_dataset (arrow_obj_handle);
88- py::object arrow_scanner = dataset.attr (" __class__" ).attr (" scanner" );
89- scanner = ProduceScanner (arrow_scanner, dataset, parameters, factory->client_properties );
90- break ;
91- }
92- case PyArrowObjectType::RecordBatchReader: {
93- scanner = ProduceScanner (arrow_batch_scanner, arrow_obj_handle, parameters, factory->client_properties );
94- break ;
95- }
96126 case PyArrowObjectType::Scanner: {
97127 // If it's a scanner we have to turn it to a record batch reader, and then a scanner again since we can't stack
98128 // scanners on arrow Otherwise pushed-down projections and filters will disappear like tears in the rain
@@ -119,37 +149,29 @@ unique_ptr<ArrowArrayStreamWrapper> PythonTableArrowArrayStreamFactory::Produce(
119149}
120150
121151void PythonTableArrowArrayStreamFactory::GetSchemaInternal (py::handle arrow_obj_handle, ArrowSchemaWrapper &schema) {
152+ // PyCapsule (from bare capsule Produce path)
122153 if (py::isinstance<py::capsule>(arrow_obj_handle)) {
123154 auto capsule = py::reinterpret_borrow<py::capsule>(arrow_obj_handle);
124155 auto stream = capsule.get_pointer <struct ArrowArrayStream >();
125156 if (!stream->release ) {
126157 throw InternalException (" ArrowArrayStream was released by another thread/library" );
127158 }
128- stream->get_schema (stream, &schema.arrow_schema );
129- return ;
130- }
131-
132- auto table_class = py::module::import (" pyarrow" ).attr (" Table" );
133- if (py::isinstance (arrow_obj_handle, table_class)) {
134- auto obj_schema = arrow_obj_handle.attr (" schema" );
135- auto export_to_c = obj_schema.attr (" _export_to_c" );
136- export_to_c (reinterpret_cast <uint64_t >(&schema.arrow_schema ));
159+ if (stream->get_schema (stream, &schema.arrow_schema )) {
160+ throw InvalidInputException (" Failed to get Arrow schema from stream: %s" ,
161+ stream->get_last_error ? stream->get_last_error (stream) : " unknown error" );
162+ }
137163 return ;
138164 }
139165
166+ // Scanner: use projected_schema; everything else (RecordBatchReader, Dataset): use .schema
140167 VerifyArrowDatasetLoaded ();
141-
142168 auto &import_cache = *DuckDBPyConnection::ImportCache ();
143- auto scanner_class = import_cache.pyarrow .dataset .Scanner ();
144-
145- if (py::isinstance (arrow_obj_handle, scanner_class)) {
169+ if (py::isinstance (arrow_obj_handle, import_cache.pyarrow .dataset .Scanner ())) {
146170 auto obj_schema = arrow_obj_handle.attr (" projected_schema" );
147- auto export_to_c = obj_schema.attr (" _export_to_c" );
148- export_to_c (reinterpret_cast <uint64_t >(&schema));
171+ obj_schema.attr (" _export_to_c" )(reinterpret_cast <uint64_t >(&schema.arrow_schema ));
149172 } else {
150173 auto obj_schema = arrow_obj_handle.attr (" schema" );
151- auto export_to_c = obj_schema.attr (" _export_to_c" );
152- export_to_c (reinterpret_cast <uint64_t >(&schema));
174+ obj_schema.attr (" _export_to_c" )(reinterpret_cast <uint64_t >(&schema.arrow_schema ));
153175 }
154176}
155177
@@ -158,6 +180,36 @@ void PythonTableArrowArrayStreamFactory::GetSchema(uintptr_t factory_ptr, ArrowS
158180 auto factory = static_cast <PythonTableArrowArrayStreamFactory *>(reinterpret_cast <void *>(factory_ptr)); // NOLINT
159181 D_ASSERT (factory->arrow_object );
160182 py::handle arrow_obj_handle (factory->arrow_object );
183+
184+ auto type = DuckDBPyConnection::GetArrowType (arrow_obj_handle);
185+ if (type == PyArrowObjectType::PyCapsuleInterface) {
186+ // Get __arrow_c_schema__ if it exists
187+ if (py::hasattr (arrow_obj_handle, " __arrow_c_schema__" )) {
188+ auto schema_capsule = arrow_obj_handle.attr (" __arrow_c_schema__" )();
189+ auto capsule = py::reinterpret_borrow<py::capsule>(schema_capsule);
190+ auto arrow_schema = capsule.get_pointer <struct ArrowSchema >();
191+ schema.arrow_schema = *arrow_schema;
192+ arrow_schema->release = nullptr ; // take ownership
193+ return ;
194+ }
195+ // Otherwise try to use .schema with _export_to_c
196+ if (py::hasattr (arrow_obj_handle, " schema" )) {
197+ auto obj_schema = arrow_obj_handle.attr (" schema" );
198+ if (py::hasattr (obj_schema, " _export_to_c" )) {
199+ obj_schema.attr (" _export_to_c" )(reinterpret_cast <uint64_t >(&schema.arrow_schema ));
200+ return ;
201+ }
202+ }
203+ // Fallback: create a temporary stream just for the schema (consumes single-use streams!)
204+ auto stream_capsule = arrow_obj_handle.attr (" __arrow_c_stream__" )();
205+ auto capsule = py::reinterpret_borrow<py::capsule>(stream_capsule);
206+ auto stream = capsule.get_pointer <struct ArrowArrayStream >();
207+ if (stream->get_schema (stream, &schema.arrow_schema )) {
208+ throw InvalidInputException (" Failed to get Arrow schema from stream: %s" ,
209+ stream->get_last_error ? stream->get_last_error (stream) : " unknown error" );
210+ }
211+ return ; // stream_capsule goes out of scope, stream released by capsule destructor
212+ }
161213 GetSchemaInternal (arrow_obj_handle, schema);
162214}
163215
0 commit comments