Skip to content

Commit 542ecd6

Browse files
Support polars lazyframe projection and filter pushdown (#334)
This translates DuckDB filters into Polars expressions and pushes them into the lazy plan before collection, so only matching rows are materialized. Column projections are applied via table.select() on the resulting Arrow table.
2 parents 632544f + 60eb393 commit 542ecd6

10 files changed

Lines changed: 485 additions & 8 deletions

File tree

scripts/cache_data.json

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,9 @@
530530
"name": "polars",
531531
"children": [
532532
"polars.DataFrame",
533-
"polars.LazyFrame"
533+
"polars.LazyFrame",
534+
"polars.col",
535+
"polars.lit"
534536
],
535537
"required": false
536538
},
@@ -808,5 +810,17 @@
808810
"full_path": "typing.Union",
809811
"name": "Union",
810812
"children": []
813+
},
814+
"polars.col": {
815+
"type": "attribute",
816+
"full_path": "polars.col",
817+
"name": "col",
818+
"children": []
819+
},
820+
"polars.lit": {
821+
"type": "attribute",
822+
"full_path": "polars.lit",
823+
"name": "lit",
824+
"children": []
811825
}
812826
}

scripts/imports.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@
109109

110110
polars.DataFrame
111111
polars.LazyFrame
112+
polars.col
113+
polars.lit
112114

113115
import duckdb
114116
import duckdb.filesystem

src/duckdb_py/arrow/CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# this is used for clang-tidy checks
2-
add_library(python_arrow OBJECT arrow_array_stream.cpp arrow_export_utils.cpp
3-
pyarrow_filter_pushdown.cpp)
2+
add_library(
3+
python_arrow OBJECT arrow_array_stream.cpp arrow_export_utils.cpp
4+
polars_filter_pushdown.cpp pyarrow_filter_pushdown.cpp)
45

56
target_link_libraries(python_arrow PRIVATE _duckdb_dependencies)

src/duckdb_py/arrow/arrow_array_stream.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "duckdb_python/arrow/arrow_array_stream.hpp"
2+
#include "duckdb_python/arrow/polars_filter_pushdown.hpp"
23
#include "duckdb_python/arrow/pyarrow_filter_pushdown.hpp"
34

45
#include "duckdb_python/pyconnection/pyconnection.hpp"
@@ -66,6 +67,55 @@ unique_ptr<ArrowArrayStreamWrapper> PythonTableArrowArrayStreamFactory::Produce(
6667
py::handle arrow_obj_handle(factory->arrow_object);
6768
auto arrow_object_type = factory->cached_arrow_type;
6869

70+
if (arrow_object_type == PyArrowObjectType::PolarsLazyFrame) {
71+
py::object lf = py::reinterpret_borrow<py::object>(arrow_obj_handle);
72+
73+
auto filters = parameters.filters;
74+
bool filters_pushed = false;
75+
76+
// Translate DuckDB filters to Polars expressions and push into the lazy plan
77+
if (filters && !filters->filters.empty()) {
78+
try {
79+
auto filter_expr = PolarsFilterPushdown::TransformFilter(
80+
*filters, parameters.projected_columns.projection_map, parameters.projected_columns.filter_to_col,
81+
factory->client_properties);
82+
if (!filter_expr.is(py::none())) {
83+
lf = lf.attr("filter")(filter_expr);
84+
filters_pushed = true;
85+
}
86+
} catch (...) {
87+
// Fallback: DuckDB handles filtering post-scan
88+
}
89+
}
90+
91+
// If no filters were pushed and we have a cached Arrow table, reuse it. This avoids re-reading from source and
92+
// re-converting on repeated unfiltered scans.
93+
py::object arrow_table;
94+
if (!filters_pushed && factory->cached_arrow_table.ptr() != nullptr) {
95+
arrow_table = factory->cached_arrow_table;
96+
} else {
97+
arrow_table = lf.attr("collect")().attr("to_arrow")();
98+
// Cache only unfiltered results (filtered results are partial)
99+
if (!filters_pushed) {
100+
factory->cached_arrow_table = arrow_table;
101+
}
102+
}
103+
104+
// Apply column projection
105+
auto &column_list = parameters.projected_columns.columns;
106+
if (!column_list.empty()) {
107+
arrow_table = arrow_table.attr("select")(py::cast(column_list));
108+
}
109+
110+
auto capsule_obj = arrow_table.attr("__arrow_c_stream__")();
111+
auto capsule = py::reinterpret_borrow<py::capsule>(capsule_obj);
112+
auto stream = capsule.get_pointer<struct ArrowArrayStream>();
113+
auto res = make_uniq<ArrowArrayStreamWrapper>();
114+
res->arrow_array_stream = *stream;
115+
stream->release = nullptr;
116+
return res;
117+
}
118+
69119
if (arrow_object_type == PyArrowObjectType::PyCapsuleInterface || arrow_object_type == PyArrowObjectType::Table) {
70120
py::object capsule_obj = arrow_obj_handle.attr("__arrow_c_stream__")();
71121
auto capsule = py::reinterpret_borrow<py::capsule>(capsule_obj);
@@ -190,6 +240,20 @@ void PythonTableArrowArrayStreamFactory::GetSchema(uintptr_t factory_ptr, ArrowS
190240
py::handle arrow_obj_handle(factory->arrow_object);
191241

192242
auto type = factory->cached_arrow_type;
243+
if (type == PyArrowObjectType::PolarsLazyFrame) {
244+
// head(0).collect().to_arrow() gives the Arrow-exported schema (e.g. large_string) without materializing data.
245+
// collect_schema() would give Polars-native types (e.g. string_view) that don't match the actual export.
246+
const auto empty_arrow = arrow_obj_handle.attr("head")(0).attr("collect")().attr("to_arrow")();
247+
const auto schema_capsule = empty_arrow.attr("schema").attr("__arrow_c_schema__")();
248+
const auto capsule = py::reinterpret_borrow<py::capsule>(schema_capsule);
249+
const auto arrow_schema = capsule.get_pointer<struct ArrowSchema>();
250+
factory->cached_schema = *arrow_schema;
251+
arrow_schema->release = nullptr;
252+
factory->schema_cached = true;
253+
schema.arrow_schema = factory->cached_schema;
254+
schema.arrow_schema.release = nullptr;
255+
return;
256+
}
193257
if (type == PyArrowObjectType::PyCapsuleInterface || type == PyArrowObjectType::Table) {
194258
// Get __arrow_c_schema__ if it exists
195259
if (py::hasattr(arrow_obj_handle, "__arrow_c_schema__")) {
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
#include "duckdb_python/arrow/polars_filter_pushdown.hpp"
2+
3+
#include "duckdb/planner/filter/in_filter.hpp"
4+
#include "duckdb/planner/filter/optional_filter.hpp"
5+
#include "duckdb/planner/filter/conjunction_filter.hpp"
6+
#include "duckdb/planner/filter/constant_filter.hpp"
7+
#include "duckdb/planner/filter/struct_filter.hpp"
8+
#include "duckdb/planner/table_filter.hpp"
9+
10+
#include "duckdb_python/pyconnection/pyconnection.hpp"
11+
#include "duckdb_python/python_objects.hpp"
12+
13+
namespace duckdb {
14+
15+
static py::object TransformFilterRecursive(TableFilter &filter, py::object col_expr,
16+
const ClientProperties &client_properties) {
17+
auto &import_cache = *DuckDBPyConnection::ImportCache();
18+
19+
switch (filter.filter_type) {
20+
case TableFilterType::CONSTANT_COMPARISON: {
21+
auto &constant_filter = filter.Cast<ConstantFilter>();
22+
auto &constant = constant_filter.constant;
23+
auto &constant_type = constant.type();
24+
25+
// Check for NaN
26+
bool is_nan = false;
27+
if (constant_type.id() == LogicalTypeId::FLOAT) {
28+
is_nan = Value::IsNan(constant.GetValue<float>());
29+
} else if (constant_type.id() == LogicalTypeId::DOUBLE) {
30+
is_nan = Value::IsNan(constant.GetValue<double>());
31+
}
32+
33+
if (is_nan) {
34+
switch (constant_filter.comparison_type) {
35+
case ExpressionType::COMPARE_EQUAL:
36+
case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
37+
return col_expr.attr("is_nan")();
38+
case ExpressionType::COMPARE_LESSTHAN:
39+
case ExpressionType::COMPARE_NOTEQUAL:
40+
return col_expr.attr("is_nan")().attr("__invert__")();
41+
case ExpressionType::COMPARE_GREATERTHAN:
42+
return import_cache.polars.lit()(false);
43+
case ExpressionType::COMPARE_LESSTHANOREQUALTO:
44+
return import_cache.polars.lit()(true);
45+
default:
46+
return py::none();
47+
}
48+
}
49+
50+
// Convert DuckDB Value to Python object
51+
auto py_value = PythonObject::FromValue(constant, constant_type, client_properties);
52+
53+
switch (constant_filter.comparison_type) {
54+
case ExpressionType::COMPARE_EQUAL:
55+
return col_expr.attr("__eq__")(py_value);
56+
case ExpressionType::COMPARE_LESSTHAN:
57+
return col_expr.attr("__lt__")(py_value);
58+
case ExpressionType::COMPARE_GREATERTHAN:
59+
return col_expr.attr("__gt__")(py_value);
60+
case ExpressionType::COMPARE_LESSTHANOREQUALTO:
61+
return col_expr.attr("__le__")(py_value);
62+
case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
63+
return col_expr.attr("__ge__")(py_value);
64+
case ExpressionType::COMPARE_NOTEQUAL:
65+
return col_expr.attr("__ne__")(py_value);
66+
default:
67+
return py::none();
68+
}
69+
}
70+
case TableFilterType::IS_NULL: {
71+
return col_expr.attr("is_null")();
72+
}
73+
case TableFilterType::IS_NOT_NULL: {
74+
return col_expr.attr("is_not_null")();
75+
}
76+
case TableFilterType::CONJUNCTION_AND: {
77+
auto &and_filter = filter.Cast<ConjunctionAndFilter>();
78+
py::object expression = py::none();
79+
for (idx_t i = 0; i < and_filter.child_filters.size(); i++) {
80+
auto child_expression = TransformFilterRecursive(*and_filter.child_filters[i], col_expr, client_properties);
81+
if (child_expression.is(py::none())) {
82+
continue;
83+
}
84+
if (expression.is(py::none())) {
85+
expression = std::move(child_expression);
86+
} else {
87+
expression = expression.attr("__and__")(child_expression);
88+
}
89+
}
90+
return expression;
91+
}
92+
case TableFilterType::CONJUNCTION_OR: {
93+
auto &or_filter = filter.Cast<ConjunctionOrFilter>();
94+
py::object expression = py::none();
95+
for (idx_t i = 0; i < or_filter.child_filters.size(); i++) {
96+
auto child_expression = TransformFilterRecursive(*or_filter.child_filters[i], col_expr, client_properties);
97+
if (child_expression.is(py::none())) {
98+
// Can't skip children in OR
99+
return py::none();
100+
}
101+
if (expression.is(py::none())) {
102+
expression = std::move(child_expression);
103+
} else {
104+
expression = expression.attr("__or__")(child_expression);
105+
}
106+
}
107+
return expression;
108+
}
109+
case TableFilterType::STRUCT_EXTRACT: {
110+
auto &struct_filter = filter.Cast<StructFilter>();
111+
auto child_col = col_expr.attr("struct").attr("field")(struct_filter.child_name);
112+
return TransformFilterRecursive(*struct_filter.child_filter, child_col, client_properties);
113+
}
114+
case TableFilterType::IN_FILTER: {
115+
auto &in_filter = filter.Cast<InFilter>();
116+
py::list py_values;
117+
for (const auto &value : in_filter.values) {
118+
py_values.append(PythonObject::FromValue(value, value.type(), client_properties));
119+
}
120+
return col_expr.attr("is_in")(py_values);
121+
}
122+
case TableFilterType::OPTIONAL_FILTER: {
123+
auto &optional_filter = filter.Cast<OptionalFilter>();
124+
if (!optional_filter.child_filter) {
125+
return py::none();
126+
}
127+
return TransformFilterRecursive(*optional_filter.child_filter, col_expr, client_properties);
128+
}
129+
default:
130+
// We skip DYNAMIC_FILTER, EXPRESSION_FILTER, BLOOM_FILTER
131+
return py::none();
132+
}
133+
}
134+
135+
py::object PolarsFilterPushdown::TransformFilter(const TableFilterSet &filter_collection,
136+
unordered_map<idx_t, string> &columns,
137+
const unordered_map<idx_t, idx_t> &filter_to_col,
138+
const ClientProperties &client_properties) {
139+
auto &import_cache = *DuckDBPyConnection::ImportCache();
140+
auto &filters_map = filter_collection.filters;
141+
142+
py::object expression = py::none();
143+
for (auto &it : filters_map) {
144+
auto column_idx = it.first;
145+
auto &column_name = columns[column_idx];
146+
auto col_expr = import_cache.polars.col()(column_name);
147+
148+
auto child_expression = TransformFilterRecursive(*it.second, col_expr, client_properties);
149+
if (child_expression.is(py::none())) {
150+
continue;
151+
}
152+
if (expression.is(py::none())) {
153+
expression = std::move(child_expression);
154+
} else {
155+
expression = expression.attr("__and__")(child_expression);
156+
}
157+
}
158+
return expression;
159+
}
160+
161+
} // namespace duckdb

src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,16 @@ class Table : public py::object {
5151

5252
} // namespace pyarrow
5353

54-
enum class PyArrowObjectType { Invalid, Table, Scanner, Dataset, PyCapsule, PyCapsuleInterface, MessageReader };
54+
enum class PyArrowObjectType {
55+
Invalid,
56+
Table,
57+
Scanner,
58+
Dataset,
59+
PyCapsule,
60+
PyCapsuleInterface,
61+
MessageReader,
62+
PolarsLazyFrame
63+
};
5564

5665
void TransformDuckToArrowChunk(ArrowSchema &arrow_schema, ArrowArray &data, py::list &batches);
5766

@@ -66,6 +75,10 @@ class PythonTableArrowArrayStreamFactory {
6675
}
6776

6877
~PythonTableArrowArrayStreamFactory() {
78+
if (cached_arrow_table.ptr() != nullptr) {
79+
py::gil_scoped_acquire acquire;
80+
cached_arrow_table = py::object();
81+
}
6982
if (cached_schema.release) {
7083
cached_schema.release(&cached_schema);
7184
}
@@ -84,6 +97,10 @@ class PythonTableArrowArrayStreamFactory {
8497
const ClientProperties client_properties;
8598
const PyArrowObjectType cached_arrow_type;
8699

100+
//! Cached Arrow table from an unfiltered .collect().to_arrow() on a LazyFrame.
101+
//! Avoids re-reading from source and re-converting on repeated scans without filters.
102+
py::object cached_arrow_table;
103+
87104
private:
88105
ArrowSchema cached_schema;
89106
bool schema_cached = false;
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
//===----------------------------------------------------------------------===//
2+
// DuckDB
3+
//
4+
// duckdb_python/arrow/polars_filter_pushdown.hpp
5+
//
6+
//
7+
//===----------------------------------------------------------------------===//
8+
9+
#pragma once
10+
11+
#include "duckdb/common/unordered_map.hpp"
12+
#include "duckdb/planner/table_filter.hpp"
13+
#include "duckdb/main/client_properties.hpp"
14+
#include "duckdb_python/pybind11/pybind_wrapper.hpp"
15+
16+
namespace duckdb {
17+
18+
struct PolarsFilterPushdown {
19+
static py::object TransformFilter(const TableFilterSet &filter_collection, unordered_map<idx_t, string> &columns,
20+
const unordered_map<idx_t, idx_t> &filter_to_col,
21+
const ClientProperties &client_properties);
22+
};
23+
24+
} // namespace duckdb

src/duckdb_py/include/duckdb_python/import_cache/modules/polars_module.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,17 @@ struct PolarsCacheItem : public PythonImportCacheItem {
2626
static constexpr const char *Name = "polars";
2727

2828
public:
29-
PolarsCacheItem() : PythonImportCacheItem("polars"), DataFrame("DataFrame", this), LazyFrame("LazyFrame", this) {
29+
PolarsCacheItem()
30+
: PythonImportCacheItem("polars"), DataFrame("DataFrame", this), LazyFrame("LazyFrame", this), col("col", this),
31+
lit("lit", this) {
3032
}
3133
~PolarsCacheItem() override {
3234
}
3335

3436
PythonImportCacheItem DataFrame;
3537
PythonImportCacheItem LazyFrame;
38+
PythonImportCacheItem col;
39+
PythonImportCacheItem lit;
3640

3741
protected:
3842
bool IsRequired() const override final {

src/duckdb_py/python_replacement_scan.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,7 @@ unique_ptr<TableRef> PythonReplacementScan::TryReplacementObject(const py::objec
152152
CreateArrowScan(name, arrow_dataset, *table_function, children, client_properties, PyArrowObjectType::Table,
153153
*context.db);
154154
} else if (PolarsDataFrame::IsLazyFrame(entry)) {
155-
auto materialized = entry.attr("collect")();
156-
auto arrow_dataset = materialized.attr("to_arrow")();
157-
CreateArrowScan(name, arrow_dataset, *table_function, children, client_properties, PyArrowObjectType::Table,
155+
CreateArrowScan(name, entry, *table_function, children, client_properties, PyArrowObjectType::PolarsLazyFrame,
158156
*context.db);
159157
} else if ((arrow_type = DuckDBPyConnection::GetArrowType(entry)) != PyArrowObjectType::Invalid &&
160158
!(arrow_type == PyArrowObjectType::MessageReader && !relation)) {

0 commit comments

Comments
 (0)