Skip to content

Commit 37a91ca

Browse files
committed
Implement partitioning tests and benchmarking in investigate_config.py
1 parent 941221f commit 37a91ca

1 file changed

Lines changed: 86 additions & 0 deletions

File tree

investigate_config.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#!/usr/bin/env python3
2+
3+
import math
4+
import time
5+
import pyarrow as pa
6+
from datafusion import SessionContext, col, SessionConfig
7+
from datafusion import functions as f
8+
9+
def check_target_partitions():
10+
"""Check the default target_partitions setting."""
11+
print("=== DataFusion Configuration ===")
12+
ctx = SessionContext()
13+
14+
# Create a simple test to see how many target partitions are configured
15+
batches = []
16+
for i in range(8):
17+
arr = pa.array(range(i*1000, (i+1)*1000))
18+
batches.append(pa.record_batch([arr], names=["a"]))
19+
20+
# Test with different partitioning
21+
partitions = [[batch] for batch in batches] # 8 partitions
22+
df = ctx.create_dataframe(partitions)
23+
df_agg = df.aggregate([], [f.sum(col("a"))])
24+
25+
print("Plan with 8 input partitions:")
26+
print(df_agg.explain())
27+
print("-" * 80)
28+
29+
def test_manual_config():
30+
"""Test with manually configured target_partitions."""
31+
print("=== Testing Manual Configuration ===")
32+
33+
# Create config with specific target_partitions
34+
config = SessionConfig({"datafusion.execution.target_partitions": "8"})
35+
ctx = SessionContext(config)
36+
37+
batches = []
38+
for i in range(100):
39+
start = i * 1_000_000
40+
arr = pa.array(range(start, start + 1_000_000))
41+
batches.append(pa.record_batch([arr], names=["a"]))
42+
43+
# Create 8 partitions
44+
partition_size = math.ceil(len(batches) / 8)
45+
partitions = [
46+
batches[i : i + partition_size] for i in range(0, len(batches), partition_size)
47+
]
48+
df = ctx.create_dataframe(partitions)
49+
df_agg = df.aggregate([], [f.sum(col("a"))])
50+
51+
print("Plan with manual target_partitions=8:")
52+
print(df_agg.explain())
53+
print("-" * 80)
54+
55+
def benchmark_with_repartitioning():
56+
"""Test performance with explicit repartitioning."""
57+
print("=== Benchmarking With Repartitioning ===")
58+
59+
ctx = SessionContext()
60+
batches = []
61+
for i in range(100):
62+
start = i * 1_000_000
63+
arr = pa.array(range(start, start + 1_000_000))
64+
batches.append(pa.record_batch([arr], names=["a"]))
65+
66+
# Create fewer partitions, let DataFusion repartition
67+
partitions = [batches] # All in one partition
68+
df = ctx.create_dataframe(partitions)
69+
70+
# Add an operation that might trigger repartitioning
71+
df_processed = df.select(col("a"), (col("a") * 2).alias("a2"))
72+
df_agg = df_processed.aggregate([], [f.sum(col("a")), f.sum(col("a2"))])
73+
74+
print("Plan with repartitioning:")
75+
print(df_agg.explain())
76+
77+
start = time.perf_counter()
78+
result = df_agg.collect()
79+
duration = time.perf_counter() - start
80+
print(f"Time with repartitioning: {duration:.3f}s")
81+
print("-" * 80)
82+
83+
if __name__ == "__main__":
84+
check_target_partitions()
85+
test_manual_config()
86+
benchmark_with_repartitioning()

0 commit comments

Comments
 (0)