1818//! Defines common code used in execution plans
1919
2020use std:: fs;
21- use std:: fs:: { metadata, File } ;
22- use std:: path:: { Path , PathBuf } ;
21+ use std:: fs:: metadata;
2322use std:: sync:: Arc ;
2423
2524use super :: SendableRecordBatchStream ;
@@ -28,10 +27,9 @@ use crate::{ColumnStatistics, Statistics};
2827
2928use arrow:: array:: Array ;
3029use arrow:: datatypes:: Schema ;
31- use arrow:: ipc:: writer:: { FileWriter , IpcWriteOptions } ;
3230use arrow:: record_batch:: RecordBatch ;
3331use datafusion_common:: stats:: Precision ;
34- use datafusion_common:: { plan_err, DataFusionError , Result } ;
32+ use datafusion_common:: { plan_err, Result } ;
3533use datafusion_execution:: memory_pool:: MemoryReservation ;
3634
3735use futures:: { StreamExt , TryStreamExt } ;
@@ -180,77 +178,6 @@ pub fn compute_record_batch_statistics(
180178 }
181179}
182180
183- /// Write in Arrow IPC File format.
184- pub struct IPCWriter {
185- /// Path
186- pub path : PathBuf ,
187- /// Inner writer
188- pub writer : FileWriter < File > ,
189- /// Batches written
190- pub num_batches : usize ,
191- /// Rows written
192- pub num_rows : usize ,
193- /// Bytes written
194- pub num_bytes : usize ,
195- }
196-
197- impl IPCWriter {
198- /// Create new writer
199- pub fn new ( path : & Path , schema : & Schema ) -> Result < Self > {
200- let file = File :: create ( path) . map_err ( |e| {
201- DataFusionError :: Execution ( format ! (
202- "Failed to create partition file at {path:?}: {e:?}"
203- ) )
204- } ) ?;
205- Ok ( Self {
206- num_batches : 0 ,
207- num_rows : 0 ,
208- num_bytes : 0 ,
209- path : path. into ( ) ,
210- writer : FileWriter :: try_new ( file, schema) ?,
211- } )
212- }
213-
214- /// Create new writer with IPC write options
215- pub fn new_with_options (
216- path : & Path ,
217- schema : & Schema ,
218- write_options : IpcWriteOptions ,
219- ) -> Result < Self > {
220- let file = File :: create ( path) . map_err ( |e| {
221- DataFusionError :: Execution ( format ! (
222- "Failed to create partition file at {path:?}: {e:?}"
223- ) )
224- } ) ?;
225- Ok ( Self {
226- num_batches : 0 ,
227- num_rows : 0 ,
228- num_bytes : 0 ,
229- path : path. into ( ) ,
230- writer : FileWriter :: try_new_with_options ( file, schema, write_options) ?,
231- } )
232- }
233- /// Write one single batch
234- pub fn write ( & mut self , batch : & RecordBatch ) -> Result < ( ) > {
235- self . writer . write ( batch) ?;
236- self . num_batches += 1 ;
237- self . num_rows += batch. num_rows ( ) ;
238- let num_bytes: usize = batch. get_array_memory_size ( ) ;
239- self . num_bytes += num_bytes;
240- Ok ( ( ) )
241- }
242-
243- /// Finish the writer
244- pub fn finish ( & mut self ) -> Result < ( ) > {
245- self . writer . finish ( ) . map_err ( Into :: into)
246- }
247-
248- /// Path write to
249- pub fn path ( & self ) -> & Path {
250- & self . path
251- }
252- }
253-
254181/// Checks if the given projection is valid for the given schema.
255182pub fn can_project (
256183 schema : & arrow:: datatypes:: SchemaRef ,
0 commit comments