Skip to content

Commit f819061

Browse files
adriangbclaude
andauthored
Tweak adapter serialization example (#20035)
Followup to #19437 The main changes made here are: - Remove inner_plan_bytes from ExtensionPayload and store inner plan as protobuf child in extension.inputs instead of encoded bytes - No need to manually strip the adapter before serializing, that will happen regardless The end result is a slightly smaller line count and I think slightly better approach. --------- Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
1 parent 41d48b3 commit f819061

File tree

1 file changed

+31
-56
lines changed

1 file changed

+31
-56
lines changed

datafusion-examples/examples/custom_data_source/adapter_serialization.rs

Lines changed: 31 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
//! default. This example shows how to:
2626
//! 1. Detect plans with custom adapters during serialization
2727
//! 2. Wrap them as Extension nodes with JSON-serialized adapter metadata
28-
//! 3. Unwrap and restore the adapter during deserialization
28+
//! 3. Store the inner DataSourceExec (without adapter) as a child in the extension's inputs field
29+
//! 4. Unwrap and restore the adapter during deserialization
2930
//!
3031
//! This demonstrates nested serialization (protobuf outer, JSON inner) and the power
3132
//! of the `PhysicalExtensionCodec` interception pattern. Both plan and expression
@@ -69,7 +70,6 @@ use datafusion_proto::protobuf::{
6970
use object_store::memory::InMemory;
7071
use object_store::path::Path;
7172
use object_store::{ObjectStore, PutPayload};
72-
use prost::Message;
7373
use serde::{Deserialize, Serialize};
7474

7575
/// Example showing how to preserve custom adapter information during plan serialization.
@@ -234,10 +234,10 @@ impl PhysicalExprAdapterFactory for MetadataAdapterFactory {
234234
&self,
235235
logical_file_schema: SchemaRef,
236236
physical_file_schema: SchemaRef,
237-
) -> Arc<dyn PhysicalExprAdapter> {
237+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
238238
let inner = DefaultPhysicalExprAdapterFactory
239-
.create(logical_file_schema, physical_file_schema);
240-
Arc::new(MetadataAdapter { inner })
239+
.create(logical_file_schema, physical_file_schema)?;
240+
Ok(Arc::new(MetadataAdapter { inner }))
241241
}
242242
}
243243

@@ -252,8 +252,6 @@ struct ExtensionPayload {
252252
marker: String,
253253
/// JSON-serialized adapter metadata
254254
adapter_metadata: AdapterMetadata,
255-
/// Protobuf-serialized inner DataSourceExec (without adapter)
256-
inner_plan_bytes: Vec<u8>,
257255
}
258256

259257
/// Metadata about the adapter to recreate it during deserialization
@@ -274,24 +272,20 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
274272
fn try_decode(
275273
&self,
276274
buf: &[u8],
277-
_inputs: &[Arc<dyn ExecutionPlan>],
278-
ctx: &TaskContext,
275+
inputs: &[Arc<dyn ExecutionPlan>],
276+
_ctx: &TaskContext,
279277
) -> Result<Arc<dyn ExecutionPlan>> {
280278
// Try to parse as our extension payload
281279
if let Ok(payload) = serde_json::from_slice::<ExtensionPayload>(buf)
282280
&& payload.marker == EXTENSION_MARKER
283281
{
284-
// Decode the inner plan
285-
let inner_proto = PhysicalPlanNode::decode(&payload.inner_plan_bytes[..])
286-
.map_err(|e| {
287-
datafusion::error::DataFusionError::Plan(format!(
288-
"Failed to decode inner plan: {e}"
289-
))
290-
})?;
291-
292-
// Deserialize the inner plan using default implementation
293-
let inner_plan =
294-
inner_proto.try_into_physical_plan_with_converter(ctx, self, self)?;
282+
if inputs.len() != 1 {
283+
return Err(datafusion::error::DataFusionError::Plan(format!(
284+
"Extension node expected exactly 1 child, got {}",
285+
inputs.len()
286+
)));
287+
}
288+
let inner_plan = inputs[0].clone();
295289

296290
// Recreate the adapter factory
297291
let adapter_factory = create_adapter_factory(&payload.adapter_metadata.tag);
@@ -335,51 +329,39 @@ impl PhysicalProtoConverterExtension for AdapterPreservingCodec {
335329
// 1. Create adapter metadata
336330
let adapter_metadata = AdapterMetadata { tag };
337331

338-
// 2. Create a copy of the config without the adapter
339-
let config_without_adapter = rebuild_config_without_adapter(config);
340-
341-
// 3. Create a new DataSourceExec without adapter
342-
let plan_without_adapter: Arc<dyn ExecutionPlan> =
343-
DataSourceExec::from_data_source(config_without_adapter);
344-
345-
// 4. Serialize the inner plan to protobuf bytes
332+
// 2. Serialize the inner plan to protobuf
333+
// Note that this will drop the custom adapter since the default serialization cannot handle it
346334
let inner_proto = PhysicalPlanNode::try_from_physical_plan_with_converter(
347-
plan_without_adapter,
335+
Arc::clone(plan),
348336
extension_codec,
349337
self,
350338
)?;
351339

352-
let mut inner_bytes = Vec::new();
353-
inner_proto.encode(&mut inner_bytes).map_err(|e| {
354-
datafusion::error::DataFusionError::Plan(format!(
355-
"Failed to encode inner plan: {e}"
356-
))
357-
})?;
358-
359-
// 5. Create extension payload
340+
// 3. Create extension payload to wrap the plan
341+
// so that the custom adapter gets re-attached during deserialization
342+
// The choice of JSON is arbitrary; other formats could be used.
360343
let payload = ExtensionPayload {
361344
marker: EXTENSION_MARKER.to_string(),
362345
adapter_metadata,
363-
inner_plan_bytes: inner_bytes,
364346
};
365347
let payload_bytes = serde_json::to_vec(&payload).map_err(|e| {
366348
datafusion::error::DataFusionError::Plan(format!(
367349
"Failed to serialize payload: {e}"
368350
))
369351
})?;
370352

371-
// 6. Return as PhysicalExtensionNode
353+
// 4. Return as PhysicalExtensionNode with child plan in inputs
372354
return Ok(PhysicalPlanNode {
373355
physical_plan_type: Some(PhysicalPlanType::Extension(
374356
PhysicalExtensionNode {
375357
node: payload_bytes,
376-
inputs: vec![], // Leaf node
358+
inputs: vec![inner_proto],
377359
},
378360
)),
379361
});
380362
}
381363

382-
// No adapter found - use default serialization
364+
// No adapter found, not a DataSourceExec, etc. - use default serialization
383365
PhysicalPlanNode::try_from_physical_plan_with_converter(
384366
Arc::clone(plan),
385367
extension_codec,
@@ -405,15 +387,15 @@ impl PhysicalProtoConverterExtension for AdapterPreservingCodec {
405387
payload.adapter_metadata.tag
406388
);
407389

408-
// Decode the inner plan
409-
let inner_proto = PhysicalPlanNode::decode(&payload.inner_plan_bytes[..])
410-
.map_err(|e| {
411-
datafusion::error::DataFusionError::Plan(format!(
412-
"Failed to decode inner plan: {e}"
413-
))
414-
})?;
390+
// Get the inner plan proto from inputs field
391+
if extension.inputs.is_empty() {
392+
return Err(datafusion::error::DataFusionError::Plan(
393+
"Extension node missing child plan in inputs".to_string(),
394+
));
395+
}
396+
let inner_proto = &extension.inputs[0];
415397

416-
// Deserialize the inner plan using default implementation
398+
// Deserialize the inner plan
417399
let inner_plan = inner_proto.try_into_physical_plan_with_converter(
418400
ctx,
419401
extension_codec,
@@ -494,13 +476,6 @@ fn create_adapter_factory(tag: &str) -> Arc<dyn PhysicalExprAdapterFactory> {
494476
Arc::new(MetadataAdapterFactory::new(tag))
495477
}
496478

497-
/// Rebuild a FileScanConfig without the adapter
498-
fn rebuild_config_without_adapter(config: &FileScanConfig) -> FileScanConfig {
499-
FileScanConfigBuilder::from(config.clone())
500-
.with_expr_adapter(None)
501-
.build()
502-
}
503-
504479
/// Inject an adapter into a plan (assumes plan is a DataSourceExec with FileScanConfig)
505480
fn inject_adapter_into_plan(
506481
plan: Arc<dyn ExecutionPlan>,

0 commit comments

Comments
 (0)