Skip to content

Commit 60eb393

Browse files
committed
Support polars lazyframe projection and filter pushdown
1 parent 58e68f6 commit 60eb393

10 files changed

Lines changed: 485 additions & 8 deletions

File tree

scripts/cache_data.json

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,9 @@
530530
"name": "polars",
531531
"children": [
532532
"polars.DataFrame",
533-
"polars.LazyFrame"
533+
"polars.LazyFrame",
534+
"polars.col",
535+
"polars.lit"
534536
],
535537
"required": false
536538
},
@@ -808,5 +810,17 @@
808810
"full_path": "typing.Union",
809811
"name": "Union",
810812
"children": []
813+
},
814+
"polars.col": {
815+
"type": "attribute",
816+
"full_path": "polars.col",
817+
"name": "col",
818+
"children": []
819+
},
820+
"polars.lit": {
821+
"type": "attribute",
822+
"full_path": "polars.lit",
823+
"name": "lit",
824+
"children": []
811825
}
812826
}

scripts/imports.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@
109109

110110
polars.DataFrame
111111
polars.LazyFrame
112+
polars.col
113+
polars.lit
112114

113115
import duckdb
114116
import duckdb.filesystem

src/duckdb_py/arrow/CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# this is used for clang-tidy checks
2-
add_library(python_arrow OBJECT arrow_array_stream.cpp arrow_export_utils.cpp
3-
pyarrow_filter_pushdown.cpp)
2+
add_library(
3+
python_arrow OBJECT arrow_array_stream.cpp arrow_export_utils.cpp
4+
polars_filter_pushdown.cpp pyarrow_filter_pushdown.cpp)
45

56
target_link_libraries(python_arrow PRIVATE _duckdb_dependencies)

src/duckdb_py/arrow/arrow_array_stream.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "duckdb_python/arrow/arrow_array_stream.hpp"
2+
#include "duckdb_python/arrow/polars_filter_pushdown.hpp"
23
#include "duckdb_python/arrow/pyarrow_filter_pushdown.hpp"
34

45
#include "duckdb_python/pyconnection/pyconnection.hpp"
@@ -66,6 +67,55 @@ unique_ptr<ArrowArrayStreamWrapper> PythonTableArrowArrayStreamFactory::Produce(
6667
py::handle arrow_obj_handle(factory->arrow_object);
6768
auto arrow_object_type = factory->cached_arrow_type;
6869

70+
if (arrow_object_type == PyArrowObjectType::PolarsLazyFrame) {
71+
py::object lf = py::reinterpret_borrow<py::object>(arrow_obj_handle);
72+
73+
auto filters = parameters.filters;
74+
bool filters_pushed = false;
75+
76+
// Translate DuckDB filters to Polars expressions and push into the lazy plan
77+
if (filters && !filters->filters.empty()) {
78+
try {
79+
auto filter_expr = PolarsFilterPushdown::TransformFilter(
80+
*filters, parameters.projected_columns.projection_map, parameters.projected_columns.filter_to_col,
81+
factory->client_properties);
82+
if (!filter_expr.is(py::none())) {
83+
lf = lf.attr("filter")(filter_expr);
84+
filters_pushed = true;
85+
}
86+
} catch (...) {
87+
// Fallback: DuckDB handles filtering post-scan
88+
}
89+
}
90+
91+
// If no filters were pushed and we have a cached Arrow table, reuse it. This avoids re-reading from source and
92+
// re-converting on repeated unfiltered scans.
93+
py::object arrow_table;
94+
if (!filters_pushed && factory->cached_arrow_table.ptr() != nullptr) {
95+
arrow_table = factory->cached_arrow_table;
96+
} else {
97+
arrow_table = lf.attr("collect")().attr("to_arrow")();
98+
// Cache only unfiltered results (filtered results are partial)
99+
if (!filters_pushed) {
100+
factory->cached_arrow_table = arrow_table;
101+
}
102+
}
103+
104+
// Apply column projection
105+
auto &column_list = parameters.projected_columns.columns;
106+
if (!column_list.empty()) {
107+
arrow_table = arrow_table.attr("select")(py::cast(column_list));
108+
}
109+
110+
auto capsule_obj = arrow_table.attr("__arrow_c_stream__")();
111+
auto capsule = py::reinterpret_borrow<py::capsule>(capsule_obj);
112+
auto stream = capsule.get_pointer<struct ArrowArrayStream>();
113+
auto res = make_uniq<ArrowArrayStreamWrapper>();
114+
res->arrow_array_stream = *stream;
115+
stream->release = nullptr;
116+
return res;
117+
}
118+
69119
if (arrow_object_type == PyArrowObjectType::PyCapsuleInterface || arrow_object_type == PyArrowObjectType::Table) {
70120
py::object capsule_obj = arrow_obj_handle.attr("__arrow_c_stream__")();
71121
auto capsule = py::reinterpret_borrow<py::capsule>(capsule_obj);
@@ -190,6 +240,20 @@ void PythonTableArrowArrayStreamFactory::GetSchema(uintptr_t factory_ptr, ArrowS
190240
py::handle arrow_obj_handle(factory->arrow_object);
191241

192242
auto type = factory->cached_arrow_type;
243+
if (type == PyArrowObjectType::PolarsLazyFrame) {
244+
// head(0).collect().to_arrow() gives the Arrow-exported schema (e.g. large_string) without materializing data.
245+
// collect_schema() would give Polars-native types (e.g. string_view) that don't match the actual export.
246+
const auto empty_arrow = arrow_obj_handle.attr("head")(0).attr("collect")().attr("to_arrow")();
247+
const auto schema_capsule = empty_arrow.attr("schema").attr("__arrow_c_schema__")();
248+
const auto capsule = py::reinterpret_borrow<py::capsule>(schema_capsule);
249+
const auto arrow_schema = capsule.get_pointer<struct ArrowSchema>();
250+
factory->cached_schema = *arrow_schema;
251+
arrow_schema->release = nullptr;
252+
factory->schema_cached = true;
253+
schema.arrow_schema = factory->cached_schema;
254+
schema.arrow_schema.release = nullptr;
255+
return;
256+
}
193257
if (type == PyArrowObjectType::PyCapsuleInterface || type == PyArrowObjectType::Table) {
194258
// Get __arrow_c_schema__ if it exists
195259
if (py::hasattr(arrow_obj_handle, "__arrow_c_schema__")) {
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
#include "duckdb_python/arrow/polars_filter_pushdown.hpp"
2+
3+
#include "duckdb/planner/filter/in_filter.hpp"
4+
#include "duckdb/planner/filter/optional_filter.hpp"
5+
#include "duckdb/planner/filter/conjunction_filter.hpp"
6+
#include "duckdb/planner/filter/constant_filter.hpp"
7+
#include "duckdb/planner/filter/struct_filter.hpp"
8+
#include "duckdb/planner/table_filter.hpp"
9+
10+
#include "duckdb_python/pyconnection/pyconnection.hpp"
11+
#include "duckdb_python/python_objects.hpp"
12+
13+
namespace duckdb {
14+
15+
static py::object TransformFilterRecursive(TableFilter &filter, py::object col_expr,
16+
const ClientProperties &client_properties) {
17+
auto &import_cache = *DuckDBPyConnection::ImportCache();
18+
19+
switch (filter.filter_type) {
20+
case TableFilterType::CONSTANT_COMPARISON: {
21+
auto &constant_filter = filter.Cast<ConstantFilter>();
22+
auto &constant = constant_filter.constant;
23+
auto &constant_type = constant.type();
24+
25+
// Check for NaN
26+
bool is_nan = false;
27+
if (constant_type.id() == LogicalTypeId::FLOAT) {
28+
is_nan = Value::IsNan(constant.GetValue<float>());
29+
} else if (constant_type.id() == LogicalTypeId::DOUBLE) {
30+
is_nan = Value::IsNan(constant.GetValue<double>());
31+
}
32+
33+
if (is_nan) {
34+
switch (constant_filter.comparison_type) {
35+
case ExpressionType::COMPARE_EQUAL:
36+
case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
37+
return col_expr.attr("is_nan")();
38+
case ExpressionType::COMPARE_LESSTHAN:
39+
case ExpressionType::COMPARE_NOTEQUAL:
40+
return col_expr.attr("is_nan")().attr("__invert__")();
41+
case ExpressionType::COMPARE_GREATERTHAN:
42+
return import_cache.polars.lit()(false);
43+
case ExpressionType::COMPARE_LESSTHANOREQUALTO:
44+
return import_cache.polars.lit()(true);
45+
default:
46+
return py::none();
47+
}
48+
}
49+
50+
// Convert DuckDB Value to Python object
51+
auto py_value = PythonObject::FromValue(constant, constant_type, client_properties);
52+
53+
switch (constant_filter.comparison_type) {
54+
case ExpressionType::COMPARE_EQUAL:
55+
return col_expr.attr("__eq__")(py_value);
56+
case ExpressionType::COMPARE_LESSTHAN:
57+
return col_expr.attr("__lt__")(py_value);
58+
case ExpressionType::COMPARE_GREATERTHAN:
59+
return col_expr.attr("__gt__")(py_value);
60+
case ExpressionType::COMPARE_LESSTHANOREQUALTO:
61+
return col_expr.attr("__le__")(py_value);
62+
case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
63+
return col_expr.attr("__ge__")(py_value);
64+
case ExpressionType::COMPARE_NOTEQUAL:
65+
return col_expr.attr("__ne__")(py_value);
66+
default:
67+
return py::none();
68+
}
69+
}
70+
case TableFilterType::IS_NULL: {
71+
return col_expr.attr("is_null")();
72+
}
73+
case TableFilterType::IS_NOT_NULL: {
74+
return col_expr.attr("is_not_null")();
75+
}
76+
case TableFilterType::CONJUNCTION_AND: {
77+
auto &and_filter = filter.Cast<ConjunctionAndFilter>();
78+
py::object expression = py::none();
79+
for (idx_t i = 0; i < and_filter.child_filters.size(); i++) {
80+
auto child_expression = TransformFilterRecursive(*and_filter.child_filters[i], col_expr, client_properties);
81+
if (child_expression.is(py::none())) {
82+
continue;
83+
}
84+
if (expression.is(py::none())) {
85+
expression = std::move(child_expression);
86+
} else {
87+
expression = expression.attr("__and__")(child_expression);
88+
}
89+
}
90+
return expression;
91+
}
92+
case TableFilterType::CONJUNCTION_OR: {
93+
auto &or_filter = filter.Cast<ConjunctionOrFilter>();
94+
py::object expression = py::none();
95+
for (idx_t i = 0; i < or_filter.child_filters.size(); i++) {
96+
auto child_expression = TransformFilterRecursive(*or_filter.child_filters[i], col_expr, client_properties);
97+
if (child_expression.is(py::none())) {
98+
// Can't skip children in OR
99+
return py::none();
100+
}
101+
if (expression.is(py::none())) {
102+
expression = std::move(child_expression);
103+
} else {
104+
expression = expression.attr("__or__")(child_expression);
105+
}
106+
}
107+
return expression;
108+
}
109+
case TableFilterType::STRUCT_EXTRACT: {
110+
auto &struct_filter = filter.Cast<StructFilter>();
111+
auto child_col = col_expr.attr("struct").attr("field")(struct_filter.child_name);
112+
return TransformFilterRecursive(*struct_filter.child_filter, child_col, client_properties);
113+
}
114+
case TableFilterType::IN_FILTER: {
115+
auto &in_filter = filter.Cast<InFilter>();
116+
py::list py_values;
117+
for (const auto &value : in_filter.values) {
118+
py_values.append(PythonObject::FromValue(value, value.type(), client_properties));
119+
}
120+
return col_expr.attr("is_in")(py_values);
121+
}
122+
case TableFilterType::OPTIONAL_FILTER: {
123+
auto &optional_filter = filter.Cast<OptionalFilter>();
124+
if (!optional_filter.child_filter) {
125+
return py::none();
126+
}
127+
return TransformFilterRecursive(*optional_filter.child_filter, col_expr, client_properties);
128+
}
129+
default:
130+
// We skip DYNAMIC_FILTER, EXPRESSION_FILTER, BLOOM_FILTER
131+
return py::none();
132+
}
133+
}
134+
135+
py::object PolarsFilterPushdown::TransformFilter(const TableFilterSet &filter_collection,
136+
unordered_map<idx_t, string> &columns,
137+
const unordered_map<idx_t, idx_t> &filter_to_col,
138+
const ClientProperties &client_properties) {
139+
auto &import_cache = *DuckDBPyConnection::ImportCache();
140+
auto &filters_map = filter_collection.filters;
141+
142+
py::object expression = py::none();
143+
for (auto &it : filters_map) {
144+
auto column_idx = it.first;
145+
auto &column_name = columns[column_idx];
146+
auto col_expr = import_cache.polars.col()(column_name);
147+
148+
auto child_expression = TransformFilterRecursive(*it.second, col_expr, client_properties);
149+
if (child_expression.is(py::none())) {
150+
continue;
151+
}
152+
if (expression.is(py::none())) {
153+
expression = std::move(child_expression);
154+
} else {
155+
expression = expression.attr("__and__")(child_expression);
156+
}
157+
}
158+
return expression;
159+
}
160+
161+
} // namespace duckdb

src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,16 @@ class Table : public py::object {
5151

5252
} // namespace pyarrow
5353

54-
enum class PyArrowObjectType { Invalid, Table, Scanner, Dataset, PyCapsule, PyCapsuleInterface, MessageReader };
54+
enum class PyArrowObjectType {
55+
Invalid,
56+
Table,
57+
Scanner,
58+
Dataset,
59+
PyCapsule,
60+
PyCapsuleInterface,
61+
MessageReader,
62+
PolarsLazyFrame
63+
};
5564

5665
void TransformDuckToArrowChunk(ArrowSchema &arrow_schema, ArrowArray &data, py::list &batches);
5766

@@ -66,6 +75,10 @@ class PythonTableArrowArrayStreamFactory {
6675
}
6776

6877
~PythonTableArrowArrayStreamFactory() {
78+
if (cached_arrow_table.ptr() != nullptr) {
79+
py::gil_scoped_acquire acquire;
80+
cached_arrow_table = py::object();
81+
}
6982
if (cached_schema.release) {
7083
cached_schema.release(&cached_schema);
7184
}
@@ -84,6 +97,10 @@ class PythonTableArrowArrayStreamFactory {
8497
const ClientProperties client_properties;
8598
const PyArrowObjectType cached_arrow_type;
8699

100+
//! Cached Arrow table from an unfiltered .collect().to_arrow() on a LazyFrame.
101+
//! Avoids re-reading from source and re-converting on repeated scans without filters.
102+
py::object cached_arrow_table;
103+
87104
private:
88105
ArrowSchema cached_schema;
89106
bool schema_cached = false;
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
//===----------------------------------------------------------------------===//
2+
// DuckDB
3+
//
4+
// duckdb_python/arrow/polars_filter_pushdown.hpp
5+
//
6+
//
7+
//===----------------------------------------------------------------------===//
8+
9+
#pragma once
10+
11+
#include "duckdb/common/unordered_map.hpp"
12+
#include "duckdb/planner/table_filter.hpp"
13+
#include "duckdb/main/client_properties.hpp"
14+
#include "duckdb_python/pybind11/pybind_wrapper.hpp"
15+
16+
namespace duckdb {
17+
18+
struct PolarsFilterPushdown {
19+
static py::object TransformFilter(const TableFilterSet &filter_collection, unordered_map<idx_t, string> &columns,
20+
const unordered_map<idx_t, idx_t> &filter_to_col,
21+
const ClientProperties &client_properties);
22+
};
23+
24+
} // namespace duckdb

src/duckdb_py/include/duckdb_python/import_cache/modules/polars_module.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,17 @@ struct PolarsCacheItem : public PythonImportCacheItem {
2626
static constexpr const char *Name = "polars";
2727

2828
public:
29-
PolarsCacheItem() : PythonImportCacheItem("polars"), DataFrame("DataFrame", this), LazyFrame("LazyFrame", this) {
29+
PolarsCacheItem()
30+
: PythonImportCacheItem("polars"), DataFrame("DataFrame", this), LazyFrame("LazyFrame", this), col("col", this),
31+
lit("lit", this) {
3032
}
3133
~PolarsCacheItem() override {
3234
}
3335

3436
PythonImportCacheItem DataFrame;
3537
PythonImportCacheItem LazyFrame;
38+
PythonImportCacheItem col;
39+
PythonImportCacheItem lit;
3640

3741
protected:
3842
bool IsRequired() const override final {

src/duckdb_py/python_replacement_scan.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,7 @@ unique_ptr<TableRef> PythonReplacementScan::TryReplacementObject(const py::objec
152152
CreateArrowScan(name, arrow_dataset, *table_function, children, client_properties, PyArrowObjectType::Table,
153153
*context.db);
154154
} else if (PolarsDataFrame::IsLazyFrame(entry)) {
155-
auto materialized = entry.attr("collect")();
156-
auto arrow_dataset = materialized.attr("to_arrow")();
157-
CreateArrowScan(name, arrow_dataset, *table_function, children, client_properties, PyArrowObjectType::Table,
155+
CreateArrowScan(name, entry, *table_function, children, client_properties, PyArrowObjectType::PolarsLazyFrame,
158156
*context.db);
159157
} else if ((arrow_type = DuckDBPyConnection::GetArrowType(entry)) != PyArrowObjectType::Invalid &&
160158
!(arrow_type == PyArrowObjectType::MessageReader && !relation)) {

0 commit comments

Comments
 (0)