If the task were interpreted with the assumption that input CSV files are small (similar to the provided examples), the solution would be straightforward: load both CSV files fully into memory, join records by driver, compute all metrics, sort results, and select the top three drivers per metric.
However, the task description does not specify any constraints on input size, and given that Sky commonly works with large-scale data, the solution was designed under the assumption that CSV files may be very large (potentially gigabytes). For this reason, the implementation avoids full in-memory loading and instead processes data in a streaming fashion.
Using a distributed data-processing framework such as Spark would be a natural fit for this problem, but the task explicitly forbids such libraries. As a result, a custom streaming workflow was implemented using fs2, inspired by MapReduce-style processing.
The core idea is to treat incoming data as a stream of DriverMessage events and route them to worker instances responsible for per-driver aggregation.
The processing pipeline consists of the following stages:
Pit stop records are streamed first and routed to workers based on driver ID. For each driver, pit stop data is incrementally aggregated (average and minimum pit stop time), so individual pit stop records do not need to be retained. The memory footprint of this stage is bounded to one aggregated pit-stop state per driver, rather than all pit stop records.
Lap data is then streamed. Once a complete lap record for a driver is parsed (one CSV row), all required metrics for that driver can be calculated immediately. After metrics are emitted downstream, all in-memory state associated with that driver (lap data and pit aggregation) is released. This ensures that lap data is never accumulated across drivers, preventing unbounded memory growth.
Metrics are emitted when lap data for a driver is processed. If pit stop data exists for a driver but no lap data is ever received, pit-only metrics are emitted when the stream completes, with lap-based metrics left empty.
Driver metrics are streamed into a final aggregation stage that retains only the best N drivers per metric. Instead of sorting all results, a bounded priority queue is used per metric, allowing the system to discard worse results immediately. This keeps memory usage proportional to O(metrics × N) rather than the total number of drivers. For ties, results are ordered deterministically using driver identifier as a secondary key.
The final Top-N results are formatted and:
- logged to the console (via the logging subsystem)
- written to a CSV file at the configured output path
This architecture allows the application to process large datasets efficiently while respecting the task constraints and without relying on distributed processing frameworks.
Data processing is organized in a parallel, sharded manner. Incoming stream elements are routed to worker instances based on a sharding function derived from the driver identifier.
Specifically:
- Each incoming DriverMessage is assigned to a worker by computing a shard index from the driver ID (via a stable hash function).
- This guarantees that all messages for the same driver are always routed to the same worker.
As a result:
- Per-driver state (pit stop aggregation and lap data) can be safely maintained without synchronization.
- Workers can process different drivers fully in parallel.
- The system achieves horizontal scalability while preserving correctness.
This approach mirrors the shuffle-and-reduce phase of MapReduce-style systems.
The use of bounded queues between the router and workers introduces backpressure, ensuring that if workers are saturated, upstream file reading slows down rather than accumulating unbounded in-memory data.
sbt compile
You can run the application directly using sbt run, passing command-line arguments after --:
sbt "run --laps path/to/lap_times.csv --pits path/to/pit_times.csv --top 3 --out top_n.csv"
Default run sbt run will use files lap_times.csv and pit_times.csv with output file top_n.csv
- --laps – path to lap_times.csv (optional, default from config)
- --pits – path to pit_times.csv (optional, default from config)
- --top – number of top drivers per metric (optional, default from config)
- --out – output CSV path (optional, default from config)
Configuration defaults are loaded from application.conf and can be overridden via CLI arguments.
sbt test
- Test fixtures use temporary directories and do not require external setup.
- All file I/O is streaming; large CSV files can be processed without loading them fully into memory.
- Logging output is written via SLF4J / Logback and appears in the console during execution.
- Input CSV files may be very large (up to multiple gigabytes); therefore, all file I/O is performed in a streaming fashion.
- CSV files are expected to always contain a header row, which is skipped during parsing.
- Driver identifiers:
- are assumed to be non-empty strings
- are assumed to be already trimmed
- Lap identifiers are assumed to be positive integers (> 0).
- All time-related values (lap times, pit stop times) are assumed to be positive numbers.
- Any non-positive or non-numeric value is treated as malformed input.
- pit_times.csv rows are considered valid only if they contain exactly three columns
(lapId, driverId, pitSeconds).
Rows with an unexpected number of columns are skipped. - if pit_times.csv contains duplicate pit stop records for the same driver and lap, only the first occurrence is considered.
- In lap_times.csv, lap times for a driver are parsed until the first malformed or empty lap value.
- All subsequent lap values on that row are ignored.
- The implementation assumes each driver appears only once in lap_times.csv; if duplicate driver rows appear, only the first row uses any collected pit data and subsequent rows are treated as lap-only.
- Pit stop metrics are emitted even if lap data for a driver is absent; lap-based metrics are None in that case.
- The implementation does not perform semantic validation across datasets, for example:
- a pit stop referencing lap N when lap N does not exist in lap_times.csv
- multiple pit stops on the same lap
- Average lap time and fastest lap time are computed from all parsed lap times, including pit laps.
- The average non-pit lap time currently:
- excludes pit laps
- includes the first lap and out-laps, as implied by the task description
(note: excluding these would likely improve accuracy; see TODOs).
- Pit stop loss is not calculated if the pit stop occurs on the last lap, since no out-lap exists.
- Console output is produced via the logging subsystem (Logback / SLF4J),
not via direct stdout printing (println). - CSV output is written with:
- a fixed header: metric,rank,driver,value_sec
- values formatted to three decimal places
- deterministic ordering using (metric, rank, driverId).
- Although file I/O is streaming, each CSV row is materialized in memory as a collection of strings during parsing.
- The workflow assumes that all pit stop data is processed before lap data.
- During the pit-processing phase, the in-memory driver state map may grow proportionally to the number of distinct drivers present in pit_times.csv, until corresponding lap data is processed.
- If the pit-first assumption is violated, late-arriving pit data (after lap metrics are emitted) will not affect already computed lap results, but can produce a separate pit-only metrics entry at stream end.
- Introduce Refined types (for example, PositiveDouble and NonEmptyString) to enforce domain invariants at the type level.
- Add a test verifying that summary results are emitted to the console output (logging sink).
- Improve pit-loss baseline calculation by excluding:
- the first lap
- out-laps following pit stops
- Consider optional semantic validation between pit data and lap data (configurable strict mode).