Skip to content

Commit 560b10e

Browse files
evangelisilvaalamb
andauthored
fix: panic in ListingTableFactory when session is not SessionState (#20139)
## Which issue does this PR close? - Closes #20113. ## Rationale for this change This PR fixes a potential panic in `ListingTableFactory::create` when the provided [Session](cci:2://file:///Users/evangelisilva/.gemini/antigravity/scratch/datafusion/datafusion/session/src/session.rs:71:0-140:1) instance is not a `SessionState`. Previously, the code used `.unwrap()` on `downcast_ref::<SessionState>()`. If a custom [Session](cci:2://file:///Users/evangelisilva/.gemini/antigravity/scratch/datafusion/datafusion/session/src/session.rs:71:0-140:1) implementation was used (which is allowed by the trait), this would cause a crash. This change replaces `.unwrap()` with `ok_or_else`, returning a proper `DataFusionError::Internal` instead. ## What changes are included in this PR? - Replaced `.unwrap()` with `ok_or_else` in `ListingTableFactory::create` to safely handle session downcasting. - Added a regression test [test_create_with_invalid_session](cci:1://file:///Users/evangelisilva/.gemini/antigravity/scratch/datafusion/datafusion/core/src/datasource/listing_table_factory.rs:554:4-638:5) in [datafusion/core/src/datasource/listing_table_factory.rs](cci:7://file:///Users/evangelisilva/.gemini/antigravity/scratch/datafusion/datafusion/core/src/datasource/listing_table_factory.rs:0:0-0:0) that uses a [MockSession](cci:2://file:///Users/evangelisilva/.gemini/antigravity/scratch/datafusion/datafusion/core/src/datasource/listing_table_factory.rs:570:8-570:27) to verify the error is returned instead of panicking. ## Are these changes tested? Yes. - Added new unit test [test_create_with_invalid_session](cci:1://file:///Users/evangelisilva/.gemini/antigravity/scratch/datafusion/datafusion/core/src/datasource/listing_table_factory.rs:554:4-638:5). - Ran `cargo test -p datafusion --lib datasource::listing_table_factory::tests::test_create_with_invalid_session` and it passed. ## Are there any user-facing changes? No. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent d544735 commit 560b10e

1 file changed

Lines changed: 108 additions & 1 deletion

File tree

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,15 @@ impl TableProviderFactory for ListingTableFactory {
5454
cmd: &CreateExternalTable,
5555
) -> Result<Arc<dyn TableProvider>> {
5656
// TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here. Should file format factory be an extension to session state?
57-
let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
57+
let session_state =
58+
state
59+
.as_any()
60+
.downcast_ref::<SessionState>()
61+
.ok_or_else(|| {
62+
datafusion_common::internal_datafusion_err!(
63+
"ListingTableFactory requires SessionState"
64+
)
65+
})?;
5866
let file_format = session_state
5967
.get_file_format_factory(cmd.file_type.as_str())
6068
.ok_or(config_datafusion_err!(
@@ -546,4 +554,103 @@ mod tests {
546554
"Statistics cache should not be pre-warmed when collect_statistics is disabled"
547555
);
548556
}
557+
558+
#[tokio::test]
559+
async fn test_create_with_invalid_session() {
560+
use async_trait::async_trait;
561+
use datafusion_catalog::Session;
562+
use datafusion_common::Result;
563+
use datafusion_common::config::TableOptions;
564+
use datafusion_execution::TaskContext;
565+
use datafusion_execution::config::SessionConfig;
566+
use datafusion_physical_expr::PhysicalExpr;
567+
use datafusion_physical_plan::ExecutionPlan;
568+
use std::any::Any;
569+
use std::collections::HashMap;
570+
use std::sync::Arc;
571+
572+
// A mock Session that is NOT SessionState
573+
#[derive(Debug)]
574+
struct MockSession;
575+
576+
#[async_trait]
577+
impl Session for MockSession {
578+
fn session_id(&self) -> &str {
579+
"mock_session"
580+
}
581+
fn config(&self) -> &SessionConfig {
582+
unimplemented!()
583+
}
584+
async fn create_physical_plan(
585+
&self,
586+
_logical_plan: &datafusion_expr::LogicalPlan,
587+
) -> Result<Arc<dyn ExecutionPlan>> {
588+
unimplemented!()
589+
}
590+
fn create_physical_expr(
591+
&self,
592+
_expr: datafusion_expr::Expr,
593+
_df_schema: &DFSchema,
594+
) -> Result<Arc<dyn PhysicalExpr>> {
595+
unimplemented!()
596+
}
597+
fn scalar_functions(
598+
&self,
599+
) -> &HashMap<String, Arc<datafusion_expr::ScalarUDF>> {
600+
unimplemented!()
601+
}
602+
fn aggregate_functions(
603+
&self,
604+
) -> &HashMap<String, Arc<datafusion_expr::AggregateUDF>> {
605+
unimplemented!()
606+
}
607+
fn window_functions(
608+
&self,
609+
) -> &HashMap<String, Arc<datafusion_expr::WindowUDF>> {
610+
unimplemented!()
611+
}
612+
fn runtime_env(&self) -> &Arc<datafusion_execution::runtime_env::RuntimeEnv> {
613+
unimplemented!()
614+
}
615+
fn execution_props(
616+
&self,
617+
) -> &datafusion_expr::execution_props::ExecutionProps {
618+
unimplemented!()
619+
}
620+
fn as_any(&self) -> &dyn Any {
621+
self
622+
}
623+
fn table_options(&self) -> &TableOptions {
624+
unimplemented!()
625+
}
626+
fn table_options_mut(&mut self) -> &mut TableOptions {
627+
unimplemented!()
628+
}
629+
fn task_ctx(&self) -> Arc<TaskContext> {
630+
unimplemented!()
631+
}
632+
}
633+
634+
let factory = ListingTableFactory::new();
635+
let mock_session = MockSession;
636+
637+
let name = TableReference::bare("foo");
638+
let cmd = CreateExternalTable::builder(
639+
name,
640+
"foo.csv".to_string(),
641+
"csv",
642+
Arc::new(DFSchema::empty()),
643+
)
644+
.build();
645+
646+
// This should return an error, not panic
647+
let result = factory.create(&mock_session, &cmd).await;
648+
assert!(result.is_err());
649+
assert!(
650+
result
651+
.unwrap_err()
652+
.strip_backtrace()
653+
.contains("Internal error: ListingTableFactory requires SessionState")
654+
);
655+
}
549656
}

0 commit comments

Comments
 (0)