@@ -496,16 +496,134 @@ duckdb::pyarrow::RecordBatchReader DuckDBPyResult::FetchRecordBatchReader(idx_t
496496 return py::cast<duckdb::pyarrow::RecordBatchReader>(record_batch_reader);
497497}
498498
499+ // Holds owned copies of the string data for a deep-copied ArrowSchema node.
500+ struct ArrowSchemaCopyData {
501+ string format;
502+ string name;
503+ string metadata;
504+ };
505+
506+ static void ReleaseCopiedArrowSchema (ArrowSchema *schema) {
507+ if (!schema || !schema->release ) {
508+ return ;
509+ }
510+ for (int64_t i = 0 ; i < schema->n_children ; i++) {
511+ if (schema->children [i]->release ) {
512+ schema->children [i]->release (schema->children [i]);
513+ }
514+ delete schema->children [i];
515+ }
516+ delete[] schema->children ;
517+ if (schema->dictionary ) {
518+ if (schema->dictionary ->release ) {
519+ schema->dictionary ->release (schema->dictionary );
520+ }
521+ delete schema->dictionary ;
522+ }
523+ delete reinterpret_cast <ArrowSchemaCopyData *>(schema->private_data );
524+ schema->release = nullptr ;
525+ }
526+
527+ static idx_t ArrowMetadataSize (const char *metadata) {
528+ if (!metadata) {
529+ return 0 ;
530+ }
531+ // Arrow metadata format: int32 num_entries, then for each entry:
532+ // int32 key_len, key_bytes, int32 value_len, value_bytes
533+ auto ptr = metadata;
534+ int32_t num_entries;
535+ memcpy (&num_entries, ptr, sizeof (int32_t ));
536+ ptr += sizeof (int32_t );
537+ for (int32_t i = 0 ; i < num_entries; i++) {
538+ int32_t len;
539+ memcpy (&len, ptr, sizeof (int32_t ));
540+ ptr += sizeof (int32_t ) + len;
541+ memcpy (&len, ptr, sizeof (int32_t ));
542+ ptr += sizeof (int32_t ) + len;
543+ }
544+ return ptr - metadata;
545+ }
546+
547+ // Deep-copy an ArrowSchema. The Arrow C Data Interface specifies that get_schema
548+ // transfers ownership to the caller, so each call must produce an independent copy.
549+ // Each node owns its string data via an ArrowSchemaCopyData in private_data.
550+ static int ArrowSchemaDeepCopy (const ArrowSchema &source, ArrowSchema *out, string &error) {
551+ out->release = nullptr ;
552+ try {
553+ auto data = new ArrowSchemaCopyData ();
554+ data->format = source.format ? source.format : " " ;
555+ data->name = source.name ? source.name : " " ;
556+ if (source.metadata ) {
557+ auto metadata_size = ArrowMetadataSize (source.metadata );
558+ data->metadata .assign (source.metadata , metadata_size);
559+ }
560+
561+ out->format = data->format .c_str ();
562+ out->name = data->name .c_str ();
563+ out->metadata = source.metadata ? data->metadata .data () : nullptr ;
564+ out->flags = source.flags ;
565+ out->n_children = source.n_children ;
566+ out->dictionary = nullptr ;
567+ out->private_data = data;
568+ out->release = ReleaseCopiedArrowSchema;
569+
570+ if (source.n_children > 0 ) {
571+ out->children = new ArrowSchema *[source.n_children ];
572+ for (int64_t i = 0 ; i < source.n_children ; i++) {
573+ out->children [i] = new ArrowSchema ();
574+ auto rc = ArrowSchemaDeepCopy (*source.children [i], out->children [i], error);
575+ if (rc != 0 ) {
576+ for (int64_t j = 0 ; j <= i; j++) {
577+ if (out->children [j]->release ) {
578+ out->children [j]->release (out->children [j]);
579+ }
580+ delete out->children [j];
581+ }
582+ delete[] out->children ;
583+ out->children = nullptr ;
584+ out->n_children = 0 ;
585+ // Release the partially constructed node
586+ delete data;
587+ out->private_data = nullptr ;
588+ out->release = nullptr ;
589+ return rc;
590+ }
591+ }
592+ } else {
593+ out->children = nullptr ;
594+ }
595+
596+ if (source.dictionary ) {
597+ out->dictionary = new ArrowSchema ();
598+ auto rc = ArrowSchemaDeepCopy (*source.dictionary , out->dictionary , error);
599+ if (rc != 0 ) {
600+ delete out->dictionary ;
601+ out->dictionary = nullptr ;
602+ return rc;
603+ }
604+ }
605+ } catch (std::exception &e) {
606+ error = e.what ();
607+ return -1 ;
608+ }
609+ return 0 ;
610+ }
611+
499612// Wraps pre-built Arrow arrays from an ArrowQueryResult into an ArrowArrayStream.
500613// This avoids the double-materialization that happens when using ResultArrowArrayStreamWrapper
501614// with an ArrowQueryResult (which throws NotImplementedException from FetchInternal).
615+ //
616+ // The schema is cached eagerly in the constructor (while the ClientContext is still alive)
617+ // so that get_schema can be called after the originating connection has been destroyed.
618+ // ToArrowSchema needs a live ClientContext for transaction access and catalog lookups
619+ // (e.g. CRS conversion for GEOMETRY types).
502620struct ArrowQueryResultStreamWrapper {
503621 ArrowQueryResultStreamWrapper (unique_ptr<QueryResult> result_p) : result(std::move(result_p)), index(0 ) {
504622 auto &arrow_result = result->Cast <ArrowQueryResult>();
505623 arrays = arrow_result.ConsumeArrays ();
506- types = result-> types ;
507- names = result-> names ;
508- client_properties = result->client_properties ;
624+
625+ cached_schema. release = nullptr ;
626+ ArrowConverter::ToArrowSchema (&cached_schema, result-> types , result->names , result-> client_properties ) ;
509627
510628 stream.private_data = this ;
511629 stream.get_schema = GetSchema;
@@ -514,19 +632,18 @@ struct ArrowQueryResultStreamWrapper {
514632 stream.get_last_error = GetLastError;
515633 }
516634
635+ ~ArrowQueryResultStreamWrapper () {
636+ if (cached_schema.release ) {
637+ cached_schema.release (&cached_schema);
638+ }
639+ }
640+
517641 static int GetSchema (ArrowArrayStream *stream, ArrowSchema *out) {
518642 if (!stream->release ) {
519643 return -1 ;
520644 }
521645 auto self = reinterpret_cast <ArrowQueryResultStreamWrapper *>(stream->private_data );
522- out->release = nullptr ;
523- try {
524- ArrowConverter::ToArrowSchema (out, self->types , self->names , self->client_properties );
525- } catch (std::runtime_error &e) {
526- self->last_error = e.what ();
527- return -1 ;
528- }
529- return 0 ;
646+ return ArrowSchemaDeepCopy (self->cached_schema , out, self->last_error );
530647 }
531648
532649 static int GetNext (ArrowArrayStream *stream, ArrowArray *out) {
@@ -563,14 +680,89 @@ struct ArrowQueryResultStreamWrapper {
563680 ArrowArrayStream stream;
564681 unique_ptr<QueryResult> result;
565682 vector<unique_ptr<ArrowArrayWrapper>> arrays;
566- vector<LogicalType> types;
567- vector<string> names;
568- ClientProperties client_properties;
683+ ArrowSchema cached_schema;
569684 idx_t index;
570685 string last_error;
571686};
572687
573- // Destructor for capsules that own a heap-allocated ArrowArrayStream (slow path).
688+ // Wraps an ArrowArrayStream and caches its schema eagerly.
689+ // Used for the slow path (MaterializedQueryResult / StreamQueryResult) where the
690+ // inner stream is a ResultArrowArrayStreamWrapper from DuckDB core. That wrapper's
691+ // get_schema calls ToArrowSchema which needs a live ClientContext, so we fetch it
692+ // once at construction time and return copies from cache afterwards.
693+ struct SchemaCachingStreamWrapper {
694+ SchemaCachingStreamWrapper (ArrowArrayStream inner_p) : inner(inner_p) {
695+ inner_p.release = nullptr ;
696+
697+ cached_schema.release = nullptr ;
698+ if (inner.get_schema (&inner, &cached_schema)) {
699+ schema_error = inner.get_last_error (&inner);
700+ schema_ok = false ;
701+ } else {
702+ schema_ok = true ;
703+ }
704+
705+ stream.private_data = this ;
706+ stream.get_schema = GetSchema;
707+ stream.get_next = GetNext;
708+ stream.release = Release;
709+ stream.get_last_error = GetLastError;
710+ }
711+
712+ ~SchemaCachingStreamWrapper () {
713+ if (cached_schema.release ) {
714+ cached_schema.release (&cached_schema);
715+ }
716+ if (inner.release ) {
717+ inner.release (&inner);
718+ }
719+ }
720+
721+ static int GetSchema (ArrowArrayStream *stream, ArrowSchema *out) {
722+ if (!stream->release ) {
723+ return -1 ;
724+ }
725+ auto self = reinterpret_cast <SchemaCachingStreamWrapper *>(stream->private_data );
726+ if (!self->schema_ok ) {
727+ return -1 ;
728+ }
729+ return ArrowSchemaDeepCopy (self->cached_schema , out, self->schema_error );
730+ }
731+
732+ static int GetNext (ArrowArrayStream *stream, ArrowArray *out) {
733+ if (!stream->release ) {
734+ return -1 ;
735+ }
736+ auto self = reinterpret_cast <SchemaCachingStreamWrapper *>(stream->private_data );
737+ return self->inner .get_next (&self->inner , out);
738+ }
739+
740+ static void Release (ArrowArrayStream *stream) {
741+ if (!stream || !stream->release ) {
742+ return ;
743+ }
744+ stream->release = nullptr ;
745+ delete reinterpret_cast <SchemaCachingStreamWrapper *>(stream->private_data );
746+ }
747+
748+ static const char *GetLastError (ArrowArrayStream *stream) {
749+ if (!stream->release ) {
750+ return " stream was released" ;
751+ }
752+ auto self = reinterpret_cast <SchemaCachingStreamWrapper *>(stream->private_data );
753+ if (!self->schema_error .empty ()) {
754+ return self->schema_error .c_str ();
755+ }
756+ return self->inner .get_last_error (&self->inner );
757+ }
758+
759+ ArrowArrayStream stream;
760+ ArrowArrayStream inner;
761+ ArrowSchema cached_schema;
762+ bool schema_ok;
763+ string schema_error;
764+ };
765+
574766static void ArrowArrayStreamPyCapsuleDestructor (PyObject *object) {
575767 auto data = PyCapsule_GetPointer (object, " arrow_array_stream" );
576768 if (!data) {
@@ -586,19 +778,19 @@ static void ArrowArrayStreamPyCapsuleDestructor(PyObject *object) {
586778py::object DuckDBPyResult::FetchArrowCapsule (idx_t rows_per_batch) {
587779 if (result && result->type == QueryResultType::ARROW_RESULT) {
588780 // Fast path: yield pre-built Arrow arrays directly.
589- // The wrapper is heap-allocated; Release() deletes it via private_data.
590- // We heap-allocate a separate ArrowArrayStream for the capsule so that the capsule
591- // holds a stable pointer even after the wrapper is consumed and deleted by a scan.
592781 auto wrapper = new ArrowQueryResultStreamWrapper (std::move (result));
593782 auto stream = new ArrowArrayStream ();
594783 *stream = wrapper->stream ;
595784 wrapper->stream .release = nullptr ;
596785 return py::capsule (stream, " arrow_array_stream" , ArrowArrayStreamPyCapsuleDestructor);
597786 }
598- // Existing slow path for MaterializedQueryResult / StreamQueryResult
599- auto stream_p = FetchArrowArrayStream (rows_per_batch);
787+ // Slow path: wrap in SchemaCachingStreamWrapper so the schema is fetched
788+ // eagerly while the ClientContext is still alive.
789+ auto inner_stream = FetchArrowArrayStream (rows_per_batch);
790+ auto wrapper = new SchemaCachingStreamWrapper (inner_stream);
600791 auto stream = new ArrowArrayStream ();
601- *stream = stream_p;
792+ *stream = wrapper->stream ;
793+ wrapper->stream .release = nullptr ;
602794 return py::capsule (stream, " arrow_array_stream" , ArrowArrayStreamPyCapsuleDestructor);
603795}
604796
0 commit comments