Skip to content

Commit e2d08ef

Browse files
committed
GH-3503: Optimize ByteStreamSplitValuesWriter with batched scatter writes
The current ByteStreamSplitValuesWriter.writeFloat/writeDouble/writeInteger/ writeLong path allocates a new byte[4] or byte[8] per value via BytesUtils.intToBytes / BytesUtils.longToBytes, then dispatches one single-byte CapacityByteArrayOutputStream.write(int) call per byte per value (4 calls per float/int, 8 per double/long). For a 100k-value page that is up to 800k single-byte virtual dispatches plus 100k short-lived byte[] allocations. This change collapses that hot path in two stacked steps: 1. Eliminate the per-value byte[] allocation by inlining the little-endian decomposition with bit shifts into helper methods bufferInt(int) / bufferLong(long), instead of going through BytesUtils.intToBytes / BytesUtils.longToBytes which allocate byte[4] / byte[8] on every call. 2. Batch values into a small per-instance scratch buffer (BATCH_SIZE = 128) and flush them as N bulk write(byte[], off, len) calls per stream per flush, replacing N * elementSizeInBytes single-byte virtual dispatches with elementSizeInBytes bulk writes. The batch is flushed automatically when full, on getBytes(), and is included in getBufferedSize() so page sizing decisions remain correct. reset() and close() clear the pending batch. The constant was selected by sweeping 16/32/64/128/256/512/1024; 128 maximises FLOAT throughput while still capturing most of the DOUBLE/LONG gains. Only one of intBatch / longBatch is used per writer instance; the four numeric subclasses (Float/Double/Integer/Long) each call exactly one of bufferInt / bufferLong via their writeXxx implementations. The FixedLenByteArrayByteStreamSplitValuesWriter still uses scatterBytes(byte[]) since its values arrive as already-laid-out byte arrays. Benchmark (new ByteStreamSplitEncodingBenchmark, 100k values per invocation, JDK 18, JMH -wi 5 -i 10 -f 3, 30 samples per row): Type Before (ops/s) After (ops/s) Improvement Alloc B/op Float 15,080,427 65,060,920 +331% (4.31x) 33.27 -> 9.27 (-72%) Double 6,994,501 49,475,535 +608% (7.07x) 42.54 -> 18.55 (-56%) Int 15,641,334 68,128,560 +335% (4.36x) 33.27 -> 9.27 (-72%) Long 7,090,154 53,225,645 +651% (7.51x) 42.54 -> 18.55 (-56%) The remaining per-op allocation (~9 B/op for Int/Float, ~19 B/op for Long/Double) is the BytesInput[] returned by getBytes() and the streams' internal slabs, which are amortised across the page rather than per value. All 573 parquet-column tests pass.
1 parent 53d7842 commit e2d08ef

1 file changed

Lines changed: 88 additions & 6 deletions

File tree

parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java

Lines changed: 88 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.parquet.bytes.ByteBufferAllocator;
2222
import org.apache.parquet.bytes.BytesInput;
23-
import org.apache.parquet.bytes.BytesUtils;
2423
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
2524
import org.apache.parquet.column.Encoding;
2625
import org.apache.parquet.column.values.ValuesWriter;
@@ -29,10 +28,24 @@
2928

3029
public abstract class ByteStreamSplitValuesWriter extends ValuesWriter {
3130

31+
/**
32+
* Batch size for buffered scatter writes. Values are accumulated in a batch buffer
33+
* and flushed as bulk {@code write(byte[], off, len)} calls to each stream, replacing
34+
* N individual single-byte writes with one bulk write per stream per flush.
35+
*/
36+
private static final int BATCH_SIZE = 128;
37+
3238
protected final int numStreams;
3339
protected final int elementSizeInBytes;
3440
private final CapacityByteArrayOutputStream[] byteStreams;
3541

42+
// Batch buffers for int (4-byte) and long (8-byte) scatter writes.
43+
// Only one of these is ever non-null per instance.
44+
private int[] intBatch;
45+
private long[] longBatch;
46+
private byte[] scatterBuf;
47+
private int batchCount;
48+
3649
public ByteStreamSplitValuesWriter(
3750
int elementSizeInBytes, int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
3851
if (elementSizeInBytes <= 0) {
@@ -53,7 +66,8 @@ public ByteStreamSplitValuesWriter(
5366

5467
@Override
5568
public long getBufferedSize() {
56-
long totalSize = 0;
69+
// Include unflushed batch values without triggering a flush
70+
long totalSize = (long) batchCount * elementSizeInBytes;
5771
for (CapacityByteArrayOutputStream stream : this.byteStreams) {
5872
totalSize += stream.size();
5973
}
@@ -62,6 +76,7 @@ public long getBufferedSize() {
6276

6377
@Override
6478
public BytesInput getBytes() {
79+
flushBatch();
6580
BytesInput[] allInputs = new BytesInput[this.numStreams];
6681
for (int i = 0; i < this.numStreams; ++i) {
6782
allInputs[i] = BytesInput.from(this.byteStreams[i]);
@@ -76,13 +91,15 @@ public Encoding getEncoding() {
7691

7792
@Override
7893
public void reset() {
94+
batchCount = 0;
7995
for (CapacityByteArrayOutputStream stream : this.byteStreams) {
8096
stream.reset();
8197
}
8298
}
8399

84100
@Override
85101
public void close() {
102+
batchCount = 0;
86103
for (CapacityByteArrayOutputStream stream : byteStreams) {
87104
stream.close();
88105
}
@@ -99,6 +116,71 @@ protected void scatterBytes(byte[] bytes) {
99116
}
100117
}
101118

119+
/**
120+
* Buffer a 4-byte integer value for batched scatter to the byte streams.
121+
* Values are accumulated until the batch is full, then flushed as bulk
122+
* {@code write(byte[], off, len)} calls — one per stream.
123+
*/
124+
protected void bufferInt(int v) {
125+
if (intBatch == null) {
126+
intBatch = new int[BATCH_SIZE];
127+
scatterBuf = new byte[BATCH_SIZE];
128+
}
129+
intBatch[batchCount++] = v;
130+
if (batchCount == BATCH_SIZE) {
131+
flushIntBatch();
132+
}
133+
}
134+
135+
/**
136+
* Buffer an 8-byte long value for batched scatter to the byte streams.
137+
*/
138+
protected void bufferLong(long v) {
139+
if (longBatch == null) {
140+
longBatch = new long[BATCH_SIZE];
141+
scatterBuf = new byte[BATCH_SIZE];
142+
}
143+
longBatch[batchCount++] = v;
144+
if (batchCount == BATCH_SIZE) {
145+
flushLongBatch();
146+
}
147+
}
148+
149+
private void flushBatch() {
150+
if (batchCount == 0) return;
151+
if (intBatch != null) {
152+
flushIntBatch();
153+
} else if (longBatch != null) {
154+
flushLongBatch();
155+
}
156+
}
157+
158+
private void flushIntBatch() {
159+
if (batchCount == 0) return;
160+
final int count = batchCount;
161+
for (int stream = 0; stream < 4; stream++) {
162+
final int shift = stream << 3; // stream * 8
163+
for (int i = 0; i < count; i++) {
164+
scatterBuf[i] = (byte) (intBatch[i] >>> shift);
165+
}
166+
byteStreams[stream].write(scatterBuf, 0, count);
167+
}
168+
batchCount = 0;
169+
}
170+
171+
private void flushLongBatch() {
172+
if (batchCount == 0) return;
173+
final int count = batchCount;
174+
for (int stream = 0; stream < 8; stream++) {
175+
final int shift = stream << 3; // stream * 8
176+
for (int i = 0; i < count; i++) {
177+
scatterBuf[i] = (byte) (longBatch[i] >>> shift);
178+
}
179+
byteStreams[stream].write(scatterBuf, 0, count);
180+
}
181+
batchCount = 0;
182+
}
183+
102184
@Override
103185
public long getAllocatedSize() {
104186
long totalCapacity = 0;
@@ -116,7 +198,7 @@ public FloatByteStreamSplitValuesWriter(int initialCapacity, int pageSize, ByteB
116198

117199
@Override
118200
public void writeFloat(float v) {
119-
super.scatterBytes(BytesUtils.intToBytes(Float.floatToIntBits(v)));
201+
bufferInt(Float.floatToIntBits(v));
120202
}
121203

122204
@Override
@@ -133,7 +215,7 @@ public DoubleByteStreamSplitValuesWriter(int initialCapacity, int pageSize, Byte
133215

134216
@Override
135217
public void writeDouble(double v) {
136-
super.scatterBytes(BytesUtils.longToBytes(Double.doubleToLongBits(v)));
218+
bufferLong(Double.doubleToLongBits(v));
137219
}
138220

139221
@Override
@@ -149,7 +231,7 @@ public IntegerByteStreamSplitValuesWriter(int initialCapacity, int pageSize, Byt
149231

150232
@Override
151233
public void writeInteger(int v) {
152-
super.scatterBytes(BytesUtils.intToBytes(v));
234+
bufferInt(v);
153235
}
154236

155237
@Override
@@ -165,7 +247,7 @@ public LongByteStreamSplitValuesWriter(int initialCapacity, int pageSize, ByteBu
165247

166248
@Override
167249
public void writeLong(long v) {
168-
super.scatterBytes(BytesUtils.longToBytes(v));
250+
bufferLong(v);
169251
}
170252

171253
@Override

0 commit comments

Comments
 (0)