Skip to content

Commit 7abde9b

Browse files
Fix fsspec race (#316)
Fix a race in fsspec that sometimes crashed tests in CI on Linux
2 parents 43fce0f + 8c3bb39 commit 7abde9b

2 files changed

Lines changed: 60 additions & 6 deletions

File tree

src/duckdb_py/pyfilesystem.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,11 @@ int64_t PythonFilesystem::Write(FileHandle &handle, void *buffer, int64_t nr_byt
9898
return py::int_(write(data));
9999
}
100100
void PythonFilesystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) {
101-
Seek(handle, location);
102-
103-
Write(handle, buffer, nr_bytes);
101+
PythonGILWrapper gil;
102+
auto &py_handle = PythonFileHandle::GetHandle(handle);
103+
py_handle.attr("seek")(location);
104+
auto data = py::bytes(std::string(const_char_ptr_cast(buffer), nr_bytes));
105+
py_handle.attr("write")(data);
104106
}
105107

106108
int64_t PythonFilesystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) {
@@ -116,9 +118,11 @@ int64_t PythonFilesystem::Read(FileHandle &handle, void *buffer, int64_t nr_byte
116118
}
117119

118120
void PythonFilesystem::Read(duckdb::FileHandle &handle, void *buffer, int64_t nr_bytes, uint64_t location) {
119-
Seek(handle, location);
120-
121-
Read(handle, buffer, nr_bytes);
121+
PythonGILWrapper gil;
122+
auto &py_handle = PythonFileHandle::GetHandle(handle);
123+
py_handle.attr("seek")(location);
124+
string data = py::bytes(py_handle.attr("read")(nr_bytes));
125+
memcpy(buffer, data.c_str(), data.size());
122126
}
123127
bool PythonFilesystem::FileExists(const string &filename, optional_ptr<FileOpener> opener) {
124128
return Exists(filename, "isfile");

tests/fast/api/test_fsspec.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,53 @@ def __init__(self) -> None:
5353

5454
result = duckdb_cursor.read_parquet(file_globs=["deadlock://a", "deadlock://b"], union_by_name=True)
5555
assert len(result.fetchall()) == 100_000
56+
57+
def test_fsspec_seek_read_atomicity(self, duckdb_cursor, tmp_path):
58+
"""Regression test: concurrent positional reads must be atomic (seek+read under one GIL hold).
59+
60+
Without the fix, separate seek and read GIL acquisitions allow another thread to
61+
seek the same handle between them, corrupting data. We stress this by reading 4 files
62+
with distinct data in parallel (union_by_name) and verifying no cross-contamination.
63+
"""
64+
files = {}
65+
for i, name in enumerate(["a", "b", "c", "d"]):
66+
file_path = tmp_path / f"{name}.parquet"
67+
duckdb_cursor.sql(f"COPY (SELECT {i} AS file_id FROM range(10000)) TO '{file_path!s}' (FORMAT parquet)")
68+
files[name] = file_path.read_bytes()
69+
70+
class AtomicityTestFS(fsspec.AbstractFileSystem):
71+
protocol = "atomtest"
72+
73+
@property
74+
def fsid(self):
75+
return "atomtest"
76+
77+
def ls(self, path, detail=True, **kwargs):
78+
vals = [k for k in self._data if k.startswith(path)]
79+
if detail:
80+
return [
81+
{"name": p, "size": len(self._data[p]), "type": "file", "created": 0, "islink": False}
82+
for p in vals
83+
]
84+
return vals
85+
86+
def modified(self, path):
87+
return datetime.datetime.now()
88+
89+
def _open(self, path, **kwargs):
90+
return io.BytesIO(self._data[path])
91+
92+
def __init__(self) -> None:
93+
super().__init__()
94+
self._data = files
95+
96+
fsspec.register_implementation("atomtest", AtomicityTestFS, clobber=True)
97+
duckdb_cursor.register_filesystem(fsspec.filesystem("atomtest"))
98+
99+
globs = ["atomtest://a", "atomtest://b", "atomtest://c", "atomtest://d"]
100+
for _ in range(10):
101+
result = duckdb_cursor.sql(
102+
f"SELECT file_id, count(*) AS cnt FROM read_parquet({globs}, union_by_name=true) "
103+
"GROUP BY ALL ORDER BY file_id"
104+
).fetchall()
105+
assert result == [(0, 10000), (1, 10000), (2, 10000), (3, 10000)]

0 commit comments

Comments
 (0)