Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 108 additions & 1 deletion datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,15 @@ impl TableProviderFactory for ListingTableFactory {
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>> {
// TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here. Should file format factory be an extension to session state?
let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
let session_state =
state
.as_any()
.downcast_ref::<SessionState>()
.ok_or_else(|| {
datafusion_common::internal_datafusion_err!(
"ListingTableFactory requires SessionState"
)
})?;
let file_format = session_state
.get_file_format_factory(cmd.file_type.as_str())
.ok_or(config_datafusion_err!(
Expand Down Expand Up @@ -546,4 +554,103 @@ mod tests {
"Statistics cache should not be pre-warmed when collect_statistics is disabled"
);
}

#[tokio::test]
async fn test_create_with_invalid_session() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is probably overkill, but it isn't a terrible thing to have coverage

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::Result;
use datafusion_common::config::TableOptions;
use datafusion_execution::TaskContext;
use datafusion_execution::config::SessionConfig;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::ExecutionPlan;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

// A mock Session that is NOT SessionState
#[derive(Debug)]
struct MockSession;

#[async_trait]
impl Session for MockSession {
fn session_id(&self) -> &str {
"mock_session"
}
fn config(&self) -> &SessionConfig {
unimplemented!()
}
async fn create_physical_plan(
&self,
_logical_plan: &datafusion_expr::LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn create_physical_expr(
&self,
_expr: datafusion_expr::Expr,
_df_schema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>> {
unimplemented!()
}
fn scalar_functions(
&self,
) -> &HashMap<String, Arc<datafusion_expr::ScalarUDF>> {
unimplemented!()
}
fn aggregate_functions(
&self,
) -> &HashMap<String, Arc<datafusion_expr::AggregateUDF>> {
unimplemented!()
}
fn window_functions(
&self,
) -> &HashMap<String, Arc<datafusion_expr::WindowUDF>> {
unimplemented!()
}
fn runtime_env(&self) -> &Arc<datafusion_execution::runtime_env::RuntimeEnv> {
unimplemented!()
}
fn execution_props(
&self,
) -> &datafusion_expr::execution_props::ExecutionProps {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
self
}
fn table_options(&self) -> &TableOptions {
unimplemented!()
}
fn table_options_mut(&mut self) -> &mut TableOptions {
unimplemented!()
}
fn task_ctx(&self) -> Arc<TaskContext> {
unimplemented!()
}
}

let factory = ListingTableFactory::new();
let mock_session = MockSession;

let name = TableReference::bare("foo");
let cmd = CreateExternalTable::builder(
name,
"foo.csv".to_string(),
"csv",
Arc::new(DFSchema::empty()),
)
.build();

// This should return an error, not panic
let result = factory.create(&mock_session, &cmd).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.strip_backtrace()
.contains("Internal error: ListingTableFactory requires SessionState")
);
}
}
Loading