Skip to content

Commit ae356cb

Browse files
committed
Cache the arrow schema for streams so we don't need an active client context
1 parent b71639f commit ae356cb

1 file changed

Lines changed: 213 additions & 21 deletions

File tree

src/duckdb_py/pyresult.cpp

Lines changed: 213 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -496,16 +496,134 @@ duckdb::pyarrow::RecordBatchReader DuckDBPyResult::FetchRecordBatchReader(idx_t
496496
return py::cast<duckdb::pyarrow::RecordBatchReader>(record_batch_reader);
497497
}
498498

499+
// Holds owned copies of the string data for a deep-copied ArrowSchema node.
500+
struct ArrowSchemaCopyData {
501+
string format;
502+
string name;
503+
string metadata;
504+
};
505+
506+
static void ReleaseCopiedArrowSchema(ArrowSchema *schema) {
507+
if (!schema || !schema->release) {
508+
return;
509+
}
510+
for (int64_t i = 0; i < schema->n_children; i++) {
511+
if (schema->children[i]->release) {
512+
schema->children[i]->release(schema->children[i]);
513+
}
514+
delete schema->children[i];
515+
}
516+
delete[] schema->children;
517+
if (schema->dictionary) {
518+
if (schema->dictionary->release) {
519+
schema->dictionary->release(schema->dictionary);
520+
}
521+
delete schema->dictionary;
522+
}
523+
delete reinterpret_cast<ArrowSchemaCopyData *>(schema->private_data);
524+
schema->release = nullptr;
525+
}
526+
527+
static idx_t ArrowMetadataSize(const char *metadata) {
528+
if (!metadata) {
529+
return 0;
530+
}
531+
// Arrow metadata format: int32 num_entries, then for each entry:
532+
// int32 key_len, key_bytes, int32 value_len, value_bytes
533+
auto ptr = metadata;
534+
int32_t num_entries;
535+
memcpy(&num_entries, ptr, sizeof(int32_t));
536+
ptr += sizeof(int32_t);
537+
for (int32_t i = 0; i < num_entries; i++) {
538+
int32_t len;
539+
memcpy(&len, ptr, sizeof(int32_t));
540+
ptr += sizeof(int32_t) + len;
541+
memcpy(&len, ptr, sizeof(int32_t));
542+
ptr += sizeof(int32_t) + len;
543+
}
544+
return ptr - metadata;
545+
}
546+
547+
// Deep-copy an ArrowSchema. The Arrow C Data Interface specifies that get_schema
548+
// transfers ownership to the caller, so each call must produce an independent copy.
549+
// Each node owns its string data via an ArrowSchemaCopyData in private_data.
550+
static int ArrowSchemaDeepCopy(const ArrowSchema &source, ArrowSchema *out, string &error) {
551+
out->release = nullptr;
552+
try {
553+
auto data = new ArrowSchemaCopyData();
554+
data->format = source.format ? source.format : "";
555+
data->name = source.name ? source.name : "";
556+
if (source.metadata) {
557+
auto metadata_size = ArrowMetadataSize(source.metadata);
558+
data->metadata.assign(source.metadata, metadata_size);
559+
}
560+
561+
out->format = data->format.c_str();
562+
out->name = data->name.c_str();
563+
out->metadata = source.metadata ? data->metadata.data() : nullptr;
564+
out->flags = source.flags;
565+
out->n_children = source.n_children;
566+
out->dictionary = nullptr;
567+
out->private_data = data;
568+
out->release = ReleaseCopiedArrowSchema;
569+
570+
if (source.n_children > 0) {
571+
out->children = new ArrowSchema *[source.n_children];
572+
for (int64_t i = 0; i < source.n_children; i++) {
573+
out->children[i] = new ArrowSchema();
574+
auto rc = ArrowSchemaDeepCopy(*source.children[i], out->children[i], error);
575+
if (rc != 0) {
576+
for (int64_t j = 0; j <= i; j++) {
577+
if (out->children[j]->release) {
578+
out->children[j]->release(out->children[j]);
579+
}
580+
delete out->children[j];
581+
}
582+
delete[] out->children;
583+
out->children = nullptr;
584+
out->n_children = 0;
585+
// Release the partially constructed node
586+
delete data;
587+
out->private_data = nullptr;
588+
out->release = nullptr;
589+
return rc;
590+
}
591+
}
592+
} else {
593+
out->children = nullptr;
594+
}
595+
596+
if (source.dictionary) {
597+
out->dictionary = new ArrowSchema();
598+
auto rc = ArrowSchemaDeepCopy(*source.dictionary, out->dictionary, error);
599+
if (rc != 0) {
600+
delete out->dictionary;
601+
out->dictionary = nullptr;
602+
return rc;
603+
}
604+
}
605+
} catch (std::exception &e) {
606+
error = e.what();
607+
return -1;
608+
}
609+
return 0;
610+
}
611+
499612
// Wraps pre-built Arrow arrays from an ArrowQueryResult into an ArrowArrayStream.
500613
// This avoids the double-materialization that happens when using ResultArrowArrayStreamWrapper
501614
// with an ArrowQueryResult (which throws NotImplementedException from FetchInternal).
615+
//
616+
// The schema is cached eagerly in the constructor (while the ClientContext is still alive)
617+
// so that get_schema can be called after the originating connection has been destroyed.
618+
// ToArrowSchema needs a live ClientContext for transaction access and catalog lookups
619+
// (e.g. CRS conversion for GEOMETRY types).
502620
struct ArrowQueryResultStreamWrapper {
503621
ArrowQueryResultStreamWrapper(unique_ptr<QueryResult> result_p) : result(std::move(result_p)), index(0) {
504622
auto &arrow_result = result->Cast<ArrowQueryResult>();
505623
arrays = arrow_result.ConsumeArrays();
506-
types = result->types;
507-
names = result->names;
508-
client_properties = result->client_properties;
624+
625+
cached_schema.release = nullptr;
626+
ArrowConverter::ToArrowSchema(&cached_schema, result->types, result->names, result->client_properties);
509627

510628
stream.private_data = this;
511629
stream.get_schema = GetSchema;
@@ -514,19 +632,18 @@ struct ArrowQueryResultStreamWrapper {
514632
stream.get_last_error = GetLastError;
515633
}
516634

635+
~ArrowQueryResultStreamWrapper() {
636+
if (cached_schema.release) {
637+
cached_schema.release(&cached_schema);
638+
}
639+
}
640+
517641
static int GetSchema(ArrowArrayStream *stream, ArrowSchema *out) {
518642
if (!stream->release) {
519643
return -1;
520644
}
521645
auto self = reinterpret_cast<ArrowQueryResultStreamWrapper *>(stream->private_data);
522-
out->release = nullptr;
523-
try {
524-
ArrowConverter::ToArrowSchema(out, self->types, self->names, self->client_properties);
525-
} catch (std::runtime_error &e) {
526-
self->last_error = e.what();
527-
return -1;
528-
}
529-
return 0;
646+
return ArrowSchemaDeepCopy(self->cached_schema, out, self->last_error);
530647
}
531648

532649
static int GetNext(ArrowArrayStream *stream, ArrowArray *out) {
@@ -563,14 +680,89 @@ struct ArrowQueryResultStreamWrapper {
563680
ArrowArrayStream stream;
564681
unique_ptr<QueryResult> result;
565682
vector<unique_ptr<ArrowArrayWrapper>> arrays;
566-
vector<LogicalType> types;
567-
vector<string> names;
568-
ClientProperties client_properties;
683+
ArrowSchema cached_schema;
569684
idx_t index;
570685
string last_error;
571686
};
572687

573-
// Destructor for capsules that own a heap-allocated ArrowArrayStream (slow path).
688+
// Wraps an ArrowArrayStream and caches its schema eagerly.
689+
// Used for the slow path (MaterializedQueryResult / StreamQueryResult) where the
690+
// inner stream is a ResultArrowArrayStreamWrapper from DuckDB core. That wrapper's
691+
// get_schema calls ToArrowSchema which needs a live ClientContext, so we fetch it
692+
// once at construction time and return copies from cache afterwards.
693+
struct SchemaCachingStreamWrapper {
694+
SchemaCachingStreamWrapper(ArrowArrayStream inner_p) : inner(inner_p) {
695+
inner_p.release = nullptr;
696+
697+
cached_schema.release = nullptr;
698+
if (inner.get_schema(&inner, &cached_schema)) {
699+
schema_error = inner.get_last_error(&inner);
700+
schema_ok = false;
701+
} else {
702+
schema_ok = true;
703+
}
704+
705+
stream.private_data = this;
706+
stream.get_schema = GetSchema;
707+
stream.get_next = GetNext;
708+
stream.release = Release;
709+
stream.get_last_error = GetLastError;
710+
}
711+
712+
~SchemaCachingStreamWrapper() {
713+
if (cached_schema.release) {
714+
cached_schema.release(&cached_schema);
715+
}
716+
if (inner.release) {
717+
inner.release(&inner);
718+
}
719+
}
720+
721+
static int GetSchema(ArrowArrayStream *stream, ArrowSchema *out) {
722+
if (!stream->release) {
723+
return -1;
724+
}
725+
auto self = reinterpret_cast<SchemaCachingStreamWrapper *>(stream->private_data);
726+
if (!self->schema_ok) {
727+
return -1;
728+
}
729+
return ArrowSchemaDeepCopy(self->cached_schema, out, self->schema_error);
730+
}
731+
732+
static int GetNext(ArrowArrayStream *stream, ArrowArray *out) {
733+
if (!stream->release) {
734+
return -1;
735+
}
736+
auto self = reinterpret_cast<SchemaCachingStreamWrapper *>(stream->private_data);
737+
return self->inner.get_next(&self->inner, out);
738+
}
739+
740+
static void Release(ArrowArrayStream *stream) {
741+
if (!stream || !stream->release) {
742+
return;
743+
}
744+
stream->release = nullptr;
745+
delete reinterpret_cast<SchemaCachingStreamWrapper *>(stream->private_data);
746+
}
747+
748+
static const char *GetLastError(ArrowArrayStream *stream) {
749+
if (!stream->release) {
750+
return "stream was released";
751+
}
752+
auto self = reinterpret_cast<SchemaCachingStreamWrapper *>(stream->private_data);
753+
if (!self->schema_error.empty()) {
754+
return self->schema_error.c_str();
755+
}
756+
return self->inner.get_last_error(&self->inner);
757+
}
758+
759+
ArrowArrayStream stream;
760+
ArrowArrayStream inner;
761+
ArrowSchema cached_schema;
762+
bool schema_ok;
763+
string schema_error;
764+
};
765+
574766
static void ArrowArrayStreamPyCapsuleDestructor(PyObject *object) {
575767
auto data = PyCapsule_GetPointer(object, "arrow_array_stream");
576768
if (!data) {
@@ -586,19 +778,19 @@ static void ArrowArrayStreamPyCapsuleDestructor(PyObject *object) {
586778
py::object DuckDBPyResult::FetchArrowCapsule(idx_t rows_per_batch) {
587779
if (result && result->type == QueryResultType::ARROW_RESULT) {
588780
// Fast path: yield pre-built Arrow arrays directly.
589-
// The wrapper is heap-allocated; Release() deletes it via private_data.
590-
// We heap-allocate a separate ArrowArrayStream for the capsule so that the capsule
591-
// holds a stable pointer even after the wrapper is consumed and deleted by a scan.
592781
auto wrapper = new ArrowQueryResultStreamWrapper(std::move(result));
593782
auto stream = new ArrowArrayStream();
594783
*stream = wrapper->stream;
595784
wrapper->stream.release = nullptr;
596785
return py::capsule(stream, "arrow_array_stream", ArrowArrayStreamPyCapsuleDestructor);
597786
}
598-
// Existing slow path for MaterializedQueryResult / StreamQueryResult
599-
auto stream_p = FetchArrowArrayStream(rows_per_batch);
787+
// Slow path: wrap in SchemaCachingStreamWrapper so the schema is fetched
788+
// eagerly while the ClientContext is still alive.
789+
auto inner_stream = FetchArrowArrayStream(rows_per_batch);
790+
auto wrapper = new SchemaCachingStreamWrapper(inner_stream);
600791
auto stream = new ArrowArrayStream();
601-
*stream = stream_p;
792+
*stream = wrapper->stream;
793+
wrapper->stream.release = nullptr;
602794
return py::capsule(stream, "arrow_array_stream", ArrowArrayStreamPyCapsuleDestructor);
603795
}
604796

0 commit comments

Comments
 (0)