Skip to content

Commit 941221f

Browse files
committed
Implement analyze_execution_plan and benchmark_partition_strategies functions for partitioning analysis
1 parent 116047e commit 941221f

1 file changed

Lines changed: 78 additions & 0 deletions

File tree

analyze_threading.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#!/usr/bin/env python3
2+
3+
import math
4+
import time
5+
import pyarrow as pa
6+
from datafusion import SessionContext, col
7+
from datafusion import functions as f
8+
9+
def analyze_execution_plan(n_batches=100, batch_size=1_000_000, n_partitions=8):
10+
"""Analyze the execution plan to understand parallelization."""
11+
ctx = SessionContext()
12+
batches = []
13+
for i in range(n_batches):
14+
start = i * batch_size
15+
arr = pa.array(range(start, start + batch_size))
16+
batches.append(pa.record_batch([arr], names=["a"]))
17+
18+
partition_size = math.ceil(len(batches) / n_partitions)
19+
partitions = [
20+
batches[i : i + partition_size] for i in range(0, len(batches), partition_size)
21+
]
22+
df = ctx.create_dataframe(partitions)
23+
df_agg = df.aggregate([], [f.sum(col("a"))])
24+
25+
print(f"Configuration: {n_batches} batches, {n_partitions} partitions")
26+
print(f"Partition sizes: {[len(p) for p in partitions]}")
27+
print("Execution plan:")
28+
print(df_agg.explain())
29+
print("-" * 80)
30+
31+
def test_different_partition_configs():
32+
"""Test different partitioning strategies."""
33+
print("=== Testing Different Partition Configurations ===")
34+
35+
# Test 1: Many partitions (1 batch per partition)
36+
analyze_execution_plan(n_batches=100, n_partitions=100)
37+
38+
# Test 2: Few partitions (many batches per partition)
39+
analyze_execution_plan(n_batches=100, n_partitions=8)
40+
41+
# Test 3: Medium partitions
42+
analyze_execution_plan(n_batches=100, n_partitions=25)
43+
44+
def benchmark_partition_strategies():
45+
"""Benchmark different partitioning strategies."""
46+
print("=== Benchmarking Different Strategies ===")
47+
48+
configs = [
49+
(100, 100, "1 batch per partition"),
50+
(100, 25, "4 batches per partition"),
51+
(100, 8, "12-13 batches per partition"),
52+
(100, 4, "25 batches per partition"),
53+
(100, 1, "All batches in 1 partition")
54+
]
55+
56+
for n_batches, n_partitions, description in configs:
57+
ctx = SessionContext()
58+
batches = []
59+
for i in range(n_batches):
60+
start = i * 1_000_000
61+
arr = pa.array(range(start, start + 1_000_000))
62+
batches.append(pa.record_batch([arr], names=["a"]))
63+
64+
partition_size = math.ceil(len(batches) / n_partitions)
65+
partitions = [
66+
batches[i : i + partition_size] for i in range(0, len(batches), partition_size)
67+
]
68+
df = ctx.create_dataframe(partitions)
69+
70+
start = time.perf_counter()
71+
df.aggregate([], [f.sum(col("a"))]).collect()
72+
duration = time.perf_counter() - start
73+
74+
print(f"{description}: {duration:.3f}s")
75+
76+
if __name__ == "__main__":
77+
test_different_partition_configs()
78+
benchmark_partition_strategies()

0 commit comments

Comments
 (0)