Skip to content

Commit 84ed953

Browse files
committed
Implement benchmarking for different target_partitions configurations and performance testing without repartitioning
1 parent 37a91ca commit 84ed953

1 file changed

Lines changed: 105 additions & 0 deletions

File tree

test_repartitioning.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
#!/usr/bin/env python3
2+
3+
import math
4+
import time
5+
import pyarrow as pa
6+
from datafusion import SessionContext, col, SessionConfig
7+
from datafusion import functions as f
8+
9+
def benchmark_with_different_configs():
10+
"""Benchmark with different target_partitions configurations."""
11+
print("=== Benchmarking Different target_partitions Settings ===")
12+
13+
n_batches = 100
14+
batch_size = 1_000_000
15+
16+
# Test different target_partitions settings
17+
configs = [
18+
(1, "target_partitions=1"),
19+
(4, "target_partitions=4"),
20+
(8, "target_partitions=8"),
21+
(16, "target_partitions=16"),
22+
(None, "default (10)")
23+
]
24+
25+
for target_partitions, description in configs:
26+
print(f"\n--- {description} ---")
27+
28+
if target_partitions is None:
29+
ctx = SessionContext()
30+
else:
31+
config = SessionConfig({"datafusion.execution.target_partitions": str(target_partitions)})
32+
ctx = SessionContext(config)
33+
34+
# Create data
35+
batches = []
36+
for i in range(n_batches):
37+
start = i * batch_size
38+
arr = pa.array(range(start, start + batch_size))
39+
batches.append(pa.record_batch([arr], names=["a"]))
40+
41+
# Test with 8 input partitions
42+
partition_size = math.ceil(len(batches) / 8)
43+
partitions = [
44+
batches[i : i + partition_size] for i in range(0, len(batches), partition_size)
45+
]
46+
df = ctx.create_dataframe(partitions)
47+
df_agg = df.aggregate([], [f.sum(col("a"))])
48+
49+
# Show execution plan
50+
plan_str = str(df_agg.explain())
51+
# Extract the DataSourceExec line
52+
for line in plan_str.split('\n'):
53+
if 'DataSourceExec:' in line:
54+
print(f" {line.strip()}")
55+
break
56+
57+
# Benchmark
58+
start = time.perf_counter()
59+
result = df_agg.collect()
60+
duration = time.perf_counter() - start
61+
print(f" Time: {duration:.3f}s")
62+
63+
def test_no_repartitioning():
64+
"""Test performance when avoiding repartitioning."""
65+
print("\n=== Testing Without Repartitioning ===")
66+
67+
# Use target_partitions that matches our input partitions
68+
config = SessionConfig({"datafusion.execution.target_partitions": "8"})
69+
ctx = SessionContext(config)
70+
71+
n_batches = 100
72+
batch_size = 1_000_000
73+
74+
# Create data with exactly 8 partitions
75+
batches = []
76+
for i in range(n_batches):
77+
start = i * batch_size
78+
arr = pa.array(range(start, start + batch_size))
79+
batches.append(pa.record_batch([arr], names=["a"]))
80+
81+
partition_size = math.ceil(len(batches) / 8)
82+
partitions = [
83+
batches[i : i + partition_size] for i in range(0, len(batches), partition_size)
84+
]
85+
df = ctx.create_dataframe(partitions)
86+
df_agg = df.aggregate([], [f.sum(col("a"))])
87+
88+
print("Execution plan:")
89+
print(df_agg.explain())
90+
91+
# Benchmark multiple times
92+
times = []
93+
for i in range(3):
94+
start = time.perf_counter()
95+
result = df_agg.collect()
96+
duration = time.perf_counter() - start
97+
times.append(duration)
98+
print(f"Run {i+1}: {duration:.3f}s")
99+
100+
avg_time = sum(times) / len(times)
101+
print(f"Average: {avg_time:.3f}s")
102+
103+
if __name__ == "__main__":
104+
benchmark_with_different_configs()
105+
test_no_repartitioning()

0 commit comments

Comments
 (0)