From b5fbd1dba2c8ab75cbede6a66cfed2c6906f0a13 Mon Sep 17 00:00:00 2001 From: Brijesh-Thakkar Date: Tue, 2 Jun 2026 10:59:48 +0530 Subject: [PATCH 1/4] feat(ffi): plumb simplify, expressions, reverse_expr, documentation through FFI_WindowUDF Closes #22332 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add documentation, expressions, simplify, and reverse_expr fn pointers to FFI_WindowUDF struct Implement producer-side wrappers for each new fn pointer Wire ForeignWindowUDF trait methods to call the fn pointers Add FFI_ExpressionArgs in a new expression_args.rs sibling module to carry Vec> across the boundary Add FFI_ReversedUDWF enum (#[repr(C, u8)]) to represent the three ReversedUDWF variants stably simplify executes on the producer side and returns a serialized Expr; the consumer closure deserializes and returns the result Unit tests (local-bypass + forced-foreign) for all four methods Integration test for documentation in ffi_udwf.rs Note: ForeignWindowUDF::simplify() uses DefaultLogicalExtensionCodec internally; UDWFs requiring a custom codec are a known follow-up. API change: FFI_WindowUDF struct layout modified — target main only, no backport. --- datafusion/ffi/src/udwf/expression_args.rs | 89 +++++ datafusion/ffi/src/udwf/mod.rs | 441 ++++++++++++++++++++- datafusion/ffi/tests/ffi_udwf.rs | 63 +++ 3 files changed, 583 insertions(+), 10 deletions(-) create mode 100644 datafusion/ffi/src/udwf/expression_args.rs diff --git a/datafusion/ffi/src/udwf/expression_args.rs b/datafusion/ffi/src/udwf/expression_args.rs new file mode 100644 index 0000000000000..27bd56eb23368 --- /dev/null +++ b/datafusion/ffi/src/udwf/expression_args.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::ffi::FFI_ArrowSchema; +use arrow_schema::FieldRef; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::function::ExpressionArgs; +use datafusion_physical_expr::PhysicalExpr; +use stabby::vec::Vec as SVec; + +use crate::arrow_wrappers::WrappedSchema; +use crate::physical_expr::FFI_PhysicalExpr; +use crate::util::rvec_wrapped_to_vec_fieldref; + +/// A stable struct for sharing [`ExpressionArgs`] across FFI boundaries. +#[repr(C)] +#[derive(Debug)] +pub struct FFI_ExpressionArgs { + input_exprs: SVec, + input_fields: SVec, +} + +impl TryFrom> for FFI_ExpressionArgs { + type Error = DataFusionError; + + fn try_from(args: ExpressionArgs) -> Result { + let input_exprs = args + .input_exprs() + .iter() + .map(Arc::clone) + .map(FFI_PhysicalExpr::from) + .collect(); + + let input_fields = args + .input_fields() + .iter() + .map(|field| FFI_ArrowSchema::try_from(field.as_ref()).map(WrappedSchema)) + .collect::, _>>()? + .into_iter() + .collect(); + + Ok(Self { + input_exprs, + input_fields, + }) + } +} + +pub struct ForeignExpressionArgs { + input_exprs: Vec>, + input_fields: Vec, +} + +impl TryFrom for ForeignExpressionArgs { + type Error = DataFusionError; + + fn try_from(value: FFI_ExpressionArgs) -> Result { + let input_exprs = value.input_exprs.iter().map(Into::into).collect(); + + let input_fields = rvec_wrapped_to_vec_fieldref(&value.input_fields)?; + + Ok(Self { + input_exprs, + input_fields, + }) + } +} + +impl<'a> From<&'a ForeignExpressionArgs> for ExpressionArgs<'a> { + fn from(value: &'a ForeignExpressionArgs) -> Self { + ExpressionArgs::new(&value.input_exprs, &value.input_fields) + } +} diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index bff46386709f9..f703cb226b926 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -19,16 +19,21 @@ use std::ffi::c_void; use std::hash::{Hash, Hasher}; use std::sync::Arc; + use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow_schema::{Field, FieldRef}; -use datafusion_common::{Result, ffi_err}; -use datafusion_expr::function::WindowUDFFieldArgs; +use datafusion_common::{Result, ToDFSchema, ffi_err}; +use datafusion_expr::function::{WindowFunctionSimplification, WindowUDFFieldArgs}; use datafusion_expr::type_coercion::functions::fields_with_udf; use datafusion_expr::{ - LimitEffect, PartitionEvaluator, Signature, WindowUDF, WindowUDFImpl, + Documentation, Expr, LimitEffect, PartitionEvaluator, ReversedUDWF, Signature, WindowUDF, + WindowUDFImpl, }; +use datafusion_expr::simplify::SimplifyContext; +use datafusion_expr::registry::FunctionRegistry; use datafusion_physical_expr::PhysicalExpr; +use expression_args::{FFI_ExpressionArgs, ForeignExpressionArgs}; use partition_evaluator::FFI_PartitionEvaluator; use partition_evaluator_args::{ FFI_PartitionEvaluatorArgs, ForeignPartitionEvaluatorArgs, @@ -37,17 +42,20 @@ use partition_evaluator_args::{ use stabby::string::String as SString; use stabby::vec::Vec as SVec; +mod expression_args; mod partition_evaluator; mod partition_evaluator_args; mod range; use crate::arrow_wrappers::WrappedSchema; +use crate::physical_expr::FFI_PhysicalExpr; use crate::util::{ FFI_Option, FFI_Result, rvec_wrapped_to_vec_datatype, rvec_wrapped_to_vec_fieldref, vec_datatype_to_rvec_wrapped, vec_fieldref_to_rvec_wrapped, }; -use crate::volatility::FFI_Volatility; use crate::{df_result, sresult, sresult_return}; +use prost::Message; +use crate::volatility::FFI_Volatility; /// A stable struct for sharing a [`WindowUDF`] across FFI boundaries. #[repr(C)] @@ -68,17 +76,29 @@ pub struct FFI_WindowUDF { ) -> FFI_Result, - pub field: unsafe extern "C" fn( +pub field: unsafe extern "C" fn( udwf: &Self, input_types: SVec, display_name: SString, ) -> FFI_Result, - /// Performs type coercion. To simply this interface, all UDFs are treated as having - /// user defined signatures, which will in turn call coerce_types to be called. This - /// call should be transparent to most users as the internal function performs the - /// appropriate calls on the underlying [`WindowUDF`] + pub documentation: unsafe extern "C" fn(udwf: &Self) -> *const Documentation, + + pub expressions: unsafe extern "C" fn( + udwf: &Self, + args: FFI_ExpressionArgs, + ) -> SVec, + + pub simplify: unsafe extern "C" fn( + udwf: &Self, + window_function: SVec, + schema: WrappedSchema, + ) -> FFI_Result>>, + + pub reverse_expr: unsafe extern "C" fn(udwf: &Self) -> FFI_ReversedUDWF, + pub coerce_types: unsafe extern "C" fn( + udf: &Self, arg_types: SVec, ) -> FFI_Result>, @@ -155,6 +175,93 @@ unsafe extern "C" fn field_fn_wrapper( } } +unsafe extern "C" fn documentation_fn_wrapper(udwf: &FFI_WindowUDF) -> *const Documentation { + unsafe { + let inner = udwf.inner(); + match inner.documentation() { + Some(doc) => doc as *const Documentation, + None => std::ptr::null(), + } + } +} + +unsafe extern "C" fn expressions_fn_wrapper( + udwf: &FFI_WindowUDF, + args: FFI_ExpressionArgs, +) -> SVec { + unsafe { + let inner = udwf.inner(); + let args = ForeignExpressionArgs::try_from(args).unwrap(); + let expressions = inner.expressions((&args).into()); + expressions.into_iter().map(FFI_PhysicalExpr::from).collect() + } +} + +unsafe extern "C" fn simplify_fn_wrapper( + udwf: &FFI_WindowUDF, + window_function_bytes: SVec, + schema: WrappedSchema, +) -> FFI_Result>> { + unsafe { + let inner = udwf.inner(); + + // 1. Deserialize bytes to Expr using Default codec + let protobuf = sresult_return!( + datafusion_proto::protobuf::LogicalExprNode::decode( + window_function_bytes.as_ref() + ) + ); + let mut ctx = datafusion_execution::TaskContext::default(); + // Register the wrapped UDWF so it can be resolved during deserialization + sresult_return!(ctx.register_udwf(Arc::clone(inner))); + let codec = datafusion_proto::logical_plan::DefaultLogicalExtensionCodec {}; + let expr = sresult_return!(datafusion_proto::logical_plan::from_proto::parse_expr( + &protobuf, + &ctx, + &codec + )); + + // 2. Extract WindowFunction from Expr + let window_function = match expr { + Expr::WindowFunction(wf) => wf, + _ => return FFI_Result::Err("Expected WindowFunction Expr".into()), + }; + + // 3. Create dummy SimplifyContext + let schema_ref: SchemaRef = schema.into(); + let df_schema = sresult_return!(schema_ref.to_dfschema_ref()); + let info = SimplifyContext::builder().with_schema(df_schema).build(); + + // 4. Call inner.simplify() + match inner.simplify() { + Some(simplify_fn) => { + let simplified_expr = sresult_return!(simplify_fn(*window_function, &info)); + let protobuf = sresult_return!( + datafusion_proto::logical_plan::to_proto::serialize_expr( + &simplified_expr, + &codec + ) + ); + let mut buffer = Vec::new(); + sresult_return!(Message::encode(&protobuf, &mut buffer)); + FFI_Result::Ok(FFI_Option::Some(buffer.into_iter().collect())) + } + None => FFI_Result::Ok(FFI_Option::None), + } + } +} + +unsafe extern "C" fn reverse_expr_fn_wrapper(udwf: &FFI_WindowUDF) -> FFI_ReversedUDWF { + unsafe { + let inner = udwf.inner(); + match inner.reverse_expr() { + ReversedUDWF::Identical => FFI_ReversedUDWF::Identical, + ReversedUDWF::NotSupported => FFI_ReversedUDWF::NotSupported, + ReversedUDWF::Reversed(udf) => FFI_ReversedUDWF::Reversed(udf.into()), + } + } +} + unsafe extern "C" fn coerce_types_fn_wrapper( udwf: &FFI_WindowUDF, arg_types: SVec, @@ -207,6 +314,10 @@ unsafe extern "C" fn clone_fn_wrapper(udwf: &FFI_WindowUDF) -> FFI_WindowUDF { sort_options: udwf.sort_options.clone(), coerce_types: coerce_types_fn_wrapper, field: field_fn_wrapper, + documentation: documentation_fn_wrapper, + expressions: expressions_fn_wrapper, + simplify: simplify_fn_wrapper, + reverse_expr: reverse_expr_fn_wrapper, clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, @@ -242,6 +353,10 @@ impl From> for FFI_WindowUDF { sort_options, coerce_types: coerce_types_fn_wrapper, field: field_fn_wrapper, + documentation: documentation_fn_wrapper, + expressions: expressions_fn_wrapper, + simplify: simplify_fn_wrapper, + reverse_expr: reverse_expr_fn_wrapper, clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, @@ -339,7 +454,7 @@ impl WindowUDFImpl for ForeignWindowUDF { df_result!(evaluator).map(>::from) } - fn field(&self, field_args: WindowUDFFieldArgs) -> Result { +fn field(&self, field_args: WindowUDFFieldArgs) -> Result { unsafe { let input_types = vec_fieldref_to_rvec_wrapped(field_args.input_fields())?; let schema = df_result!((self.udf.field)( @@ -358,6 +473,75 @@ impl WindowUDFImpl for ForeignWindowUDF { } } + fn documentation(&self) -> Option<&Documentation> { + unsafe { + let ptr = (self.udf.documentation)(&self.udf); + ptr.as_ref() + } + } + + fn expressions( + &self, + expr_args: datafusion_expr::function::ExpressionArgs, + ) -> Vec> { + unsafe { + let args = FFI_ExpressionArgs::try_from(expr_args).unwrap(); + (self.udf.expressions)(&self.udf, args) + .into_iter() + .map(|e| Arc::::from(&e)) + .collect() + } + } + + fn simplify(&self) -> Option { + let udf = self.udf.clone(); + Some(Box::new(move |wf, info| { + let codec = datafusion_proto::logical_plan::DefaultLogicalExtensionCodec {}; + + // To serialize the window function + let expr = Expr::WindowFunction(Box::new(wf)); + let protobuf = datafusion_proto::logical_plan::to_proto::serialize_expr( + &expr, + &codec + ).map_err(|e| datafusion_common::DataFusionError::Plan(e.to_string()))?; + + let mut buffer = Vec::new(); + Message::encode(&protobuf, &mut buffer) + .map_err(|e| datafusion_common::DataFusionError::Plan(e.to_string()))?; + + let schema_ref: SchemaRef = Arc::new(info.schema().as_arrow().clone()); + let schema = WrappedSchema::from(schema_ref); + + // Call the FFI function + let result = unsafe { + (udf.simplify)(&udf, buffer.into_iter().collect(), schema) + }; + + let result: Option> = crate::df_result!(result)?.into(); + + match result { + Some(bytes) => { + let protobuf = datafusion_proto::protobuf::LogicalExprNode::decode(bytes.as_slice()) + .map_err(|e| datafusion_common::DataFusionError::Plan(e.to_string()))?; + let ctx = datafusion_execution::TaskContext::default(); + let simplified_expr = datafusion_proto::logical_plan::from_proto::parse_expr( + &protobuf, + &ctx, + &codec, + )?; + Ok(simplified_expr) + } + None => { + Ok(expr) + } + } + })) + } + + fn reverse_expr(&self) -> ReversedUDWF { + unsafe { (self.udf.reverse_expr)(&self.udf).into() } + } + fn sort_options(&self) -> Option { let options: Option<&FFI_SortOptions> = self.udf.sort_options.as_ref(); options.map(|s| s.into()) @@ -393,6 +577,27 @@ impl From<&FFI_SortOptions> for SortOptions { } } +#[repr(C, u8)] +#[derive(Debug, Clone)] +pub enum FFI_ReversedUDWF { + Identical, + NotSupported, + Reversed(FFI_WindowUDF), +} + +impl From for ReversedUDWF { + fn from(value: FFI_ReversedUDWF) -> Self { + match value { + FFI_ReversedUDWF::Identical => ReversedUDWF::Identical, + FFI_ReversedUDWF::NotSupported => ReversedUDWF::NotSupported, + FFI_ReversedUDWF::Reversed(ffi_udf) => { + let udf_impl: Arc = (&ffi_udf).into(); + ReversedUDWF::Reversed(Arc::new(WindowUDF::new_from_shared_impl(udf_impl))) + } + } + } +} + #[cfg(test)] #[cfg(feature = "integration-tests")] mod tests { @@ -403,6 +608,9 @@ mod tests { use datafusion::logical_expr::expr::Sort; use datafusion::logical_expr::{ExprFunctionExt, WindowUDF, WindowUDFImpl, col}; use datafusion::prelude::SessionContext; + use datafusion_expr::{Signature, PartitionEvaluator}; + use datafusion_expr::function::WindowUDFFieldArgs; + use arrow_schema::FieldRef; use crate::tests::create_record_batch; use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF}; @@ -482,4 +690,217 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_udwf_documentation() -> datafusion_common::Result<()> { + use datafusion_expr::{DocSection, Documentation, Volatility, function::PartitionEvaluatorArgs}; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFWithDoc { + signature: Signature, + doc: Documentation, + } + + impl WindowUDFImpl for MockUDWFWithDoc { + fn name(&self) -> &str { "mock_doc" } + fn signature(&self) -> &Signature { &self.signature } + fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> datafusion_common::Result> { unimplemented!() } + fn field(&self, _: WindowUDFFieldArgs) -> datafusion_common::Result { unimplemented!() } + fn documentation(&self) -> Option<&Documentation> { Some(&self.doc) } + } + + let doc = Documentation::builder(DocSection::default(), "description", "syntax").build(); + let original_udwf = Arc::new(WindowUDF::from(MockUDWFWithDoc { + signature: Signature::any(0, Volatility::Immutable), + doc: doc.clone(), + })); + + let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); + ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udwf: Arc = (&ffi_udwf).into(); + assert_eq!(foreign_udwf.documentation(), Some(&doc)); + Ok(()) + } + + #[test] + fn test_ffi_udwf_expressions() -> datafusion_common::Result<()> { + use arrow::datatypes::DataType; + use datafusion_expr::{Volatility, function::{ExpressionArgs, PartitionEvaluatorArgs}}; + use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_expr::expressions::col; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFWithExprs { + signature: Signature, + } + + impl WindowUDFImpl for MockUDWFWithExprs { + fn name(&self) -> &str { "mock_exprs" } + fn signature(&self) -> &Signature { &self.signature } + fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> datafusion_common::Result> { unimplemented!() } + fn field(&self, _: WindowUDFFieldArgs) -> datafusion_common::Result { unimplemented!() } + fn expressions(&self, args: ExpressionArgs) -> Vec> { + args.input_exprs().iter().rev().cloned().collect() + } + } + + let original_udwf = Arc::new(WindowUDF::from(MockUDWFWithExprs { + signature: Signature::any(0, Volatility::Immutable), + })); + + let schema = arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("a", DataType::Int32, true), + arrow::datatypes::Field::new("b", DataType::Int32, true), + ]); + let expr_a = col("a", &schema)?; + let expr_b = col("b", &schema)?; + let fields = vec![ + Arc::new(arrow::datatypes::Field::new("a", DataType::Int32, true)), + Arc::new(arrow::datatypes::Field::new("b", DataType::Int32, true)), + ]; + + let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); + ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udwf: Arc = (&ffi_udwf).into(); + + let input_exprs = [expr_a, expr_b]; + let args = ExpressionArgs::new(&input_exprs, &fields); + let result = foreign_udwf.expressions(args); + assert_eq!(result.len(), 2); + assert_eq!(format!("{}", result[0]), "b@1"); + assert_eq!(format!("{}", result[1]), "a@0"); + Ok(()) + } + + #[test] + fn test_ffi_udwf_simplify() -> datafusion_common::Result<()> { + use datafusion_expr::{Volatility, function::{PartitionEvaluatorArgs, WindowFunctionSimplification}, lit}; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFSimplify { + signature: Signature, + } + + impl WindowUDFImpl for MockUDWFSimplify { + fn name(&self) -> &str { "mock_simplify" } + fn signature(&self) -> &Signature { &self.signature } + fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> datafusion_common::Result> { unimplemented!() } + fn field(&self, _: WindowUDFFieldArgs) -> datafusion_common::Result { unimplemented!() } + fn simplify(&self) -> Option { + Some(Box::new(|_, _| Ok(lit(1)))) + } + } + + let original_udwf = Arc::new(WindowUDF::from(MockUDWFSimplify { + signature: Signature::any(0, Volatility::Immutable), + })); + let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); + ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; + + let foreign_udwf: Arc = (&ffi_udwf).into(); + let simplify_fn = foreign_udwf.simplify().unwrap(); + + let wf = datafusion_expr::expr::WindowFunction { + fun: datafusion_expr::WindowFunctionDefinition::WindowUDF(original_udwf), + params: datafusion_expr::expr::WindowFunctionParams { + args: vec![], + partition_by: vec![], + order_by: vec![], + window_frame: datafusion_expr::WindowFrame::new(None), + filter: None, + null_treatment: None, + distinct: false, + }, + }; + + let schema = arrow::datatypes::Schema::empty(); + let df_schema = datafusion_common::DFSchema::try_from(schema).unwrap(); + let info = datafusion_expr::simplify::SimplifyContext::builder().with_schema(Arc::new(df_schema)).build(); + + let simplified_expr = simplify_fn(wf, &info).unwrap(); + assert_eq!(simplified_expr, lit(1)); + + Ok(()) + } + + #[test] + fn test_ffi_udwf_reverse_expr() -> datafusion_common::Result<()> { + use datafusion_expr::{ReversedUDWF, Volatility, function::PartitionEvaluatorArgs}; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFReverse { + signature: Signature, + } + + impl WindowUDFImpl for MockUDWFReverse { + fn name(&self) -> &str { "mock_reverse" } + fn signature(&self) -> &Signature { &self.signature } + fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> datafusion_common::Result> { unimplemented!() } + fn field(&self, _: WindowUDFFieldArgs) -> datafusion_common::Result { unimplemented!() } + fn reverse_expr(&self) -> ReversedUDWF { ReversedUDWF::Identical } + } + + let original_udwf = Arc::new(WindowUDF::from(MockUDWFReverse { + signature: Signature::any(0, Volatility::Immutable), + })); + + let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); + ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udwf: Arc = (&ffi_udwf).into(); + assert!(matches!(foreign_udwf.reverse_expr(), ReversedUDWF::Identical)); + + Ok(()) + } + + #[test] + fn test_ffi_udwf_reverse_expr_recursive() -> datafusion_common::Result<()> { + use datafusion_expr::{ReversedUDWF, Volatility, function::PartitionEvaluatorArgs}; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFRecursive { + signature: Signature, + reversed: Arc, + } + + impl WindowUDFImpl for MockUDWFRecursive { + fn name(&self) -> &str { "mock_recursive" } + fn signature(&self) -> &Signature { &self.signature } + fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> datafusion_common::Result> { unimplemented!() } + fn field(&self, _: WindowUDFFieldArgs) -> datafusion_common::Result { unimplemented!() } + fn reverse_expr(&self) -> ReversedUDWF { ReversedUDWF::Reversed(Arc::clone(&self.reversed)) } + } + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFSimple { + signature: Signature, + } + impl WindowUDFImpl for MockUDWFSimple { + fn name(&self) -> &str { "mock_simple" } + fn signature(&self) -> &Signature { &self.signature } + fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> datafusion_common::Result> { unimplemented!() } + fn field(&self, _: WindowUDFFieldArgs) -> datafusion_common::Result { unimplemented!() } + } + + let reversed = Arc::new(WindowUDF::from(MockUDWFSimple { + signature: Signature::any(0, Volatility::Immutable), + })); + let original_udwf = Arc::new(WindowUDF::from(MockUDWFRecursive { + signature: Signature::any(0, Volatility::Immutable), + reversed: Arc::clone(&reversed), + })); + + // Forced foreign path + let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); + ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udwf: Arc = (&ffi_udwf).into(); + + let result = foreign_udwf.reverse_expr(); + if let ReversedUDWF::Reversed(res_udf) = result { + assert_eq!(res_udf.name(), "mock_simple"); + } else { + panic!("Expected Reversed variant"); + } + + Ok(()) + } } diff --git a/datafusion/ffi/tests/ffi_udwf.rs b/datafusion/ffi/tests/ffi_udwf.rs index 66f2621d5fe63..145a558208ff1 100644 --- a/datafusion/ffi/tests/ffi_udwf.rs +++ b/datafusion/ffi/tests/ffi_udwf.rs @@ -60,4 +60,67 @@ mod tests { Ok(()) } + + #[test] + fn test_udwf_documentation() -> Result<()> { + use datafusion::logical_expr::{DocSection, Documentation}; + use datafusion_ffi::udwf::FFI_WindowUDF; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFWithDoc { + signature: datafusion::logical_expr::Signature, + doc: Documentation, + } + + impl WindowUDFImpl for MockUDWFWithDoc { + fn name(&self) -> &str { + "mock_doc" + } + fn signature(&self) -> &datafusion::logical_expr::Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: datafusion_expr::function::PartitionEvaluatorArgs, + ) -> Result> { + unimplemented!() + } + fn field( + &self, + _: datafusion_expr::function::WindowUDFFieldArgs, + ) -> Result { + unimplemented!() + } + fn documentation(&self) -> Option<&Documentation> { + Some(&self.doc) + } + } + + let doc = Documentation::builder( + DocSection::default(), + "description", + "syntax", + ) + .build(); + let original_udwf = Arc::new(WindowUDF::from(MockUDWFWithDoc { + signature: datafusion::logical_expr::Signature::any( + 0, + datafusion::logical_expr::Volatility::Immutable, + ), + doc: doc.clone(), + })); + + let mut ffi_udwf = FFI_WindowUDF::from(original_udwf); + extern "C" fn mock_marker() -> usize { + 0xdeadbeef + } + ffi_udwf.library_marker_id = mock_marker; + + let foreign_udwf_impl: Arc = (&ffi_udwf).into(); + let foreign_udwf = WindowUDF::new_from_shared_impl(foreign_udwf_impl); + + assert_eq!(foreign_udwf.documentation(), Some(&doc)); + + Ok(()) + } } From 763e559ab522b911290054b4166770d65e24d4c2 Mon Sep 17 00:00:00 2001 From: Brijesh-Thakkar Date: Tue, 2 Jun 2026 17:40:18 +0530 Subject: [PATCH 2/4] fix(ffi): address review feedback on FFI_WindowUDF plumbing --- datafusion/ffi/src/udwf/expression_args.rs | 2 +- datafusion/ffi/src/udwf/mod.rs | 306 +++++++++++++++------ datafusion/ffi/tests/ffi_udwf.rs | 11 +- 3 files changed, 225 insertions(+), 94 deletions(-) diff --git a/datafusion/ffi/src/udwf/expression_args.rs b/datafusion/ffi/src/udwf/expression_args.rs index 27bd56eb23368..37e94425cd161 100644 --- a/datafusion/ffi/src/udwf/expression_args.rs +++ b/datafusion/ffi/src/udwf/expression_args.rs @@ -39,7 +39,7 @@ pub struct FFI_ExpressionArgs { impl TryFrom> for FFI_ExpressionArgs { type Error = DataFusionError; - fn try_from(args: ExpressionArgs) -> Result { + fn try_from(args: ExpressionArgs) -> Result { let input_exprs = args .input_exprs() .iter() diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index f703cb226b926..fcb2ee25369f9 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -19,19 +19,18 @@ use std::ffi::c_void; use std::hash::{Hash, Hasher}; use std::sync::Arc; - use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow_schema::{Field, FieldRef}; use datafusion_common::{Result, ToDFSchema, ffi_err}; use datafusion_expr::function::{WindowFunctionSimplification, WindowUDFFieldArgs}; +use datafusion_expr::registry::FunctionRegistry; +use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::type_coercion::functions::fields_with_udf; use datafusion_expr::{ - Documentation, Expr, LimitEffect, PartitionEvaluator, ReversedUDWF, Signature, WindowUDF, - WindowUDFImpl, + Documentation, Expr, LimitEffect, PartitionEvaluator, ReversedUDWF, Signature, + WindowUDF, WindowUDFImpl, }; -use datafusion_expr::simplify::SimplifyContext; -use datafusion_expr::registry::FunctionRegistry; use datafusion_physical_expr::PhysicalExpr; use expression_args::{FFI_ExpressionArgs, ForeignExpressionArgs}; use partition_evaluator::FFI_PartitionEvaluator; @@ -53,9 +52,9 @@ use crate::util::{ FFI_Option, FFI_Result, rvec_wrapped_to_vec_datatype, rvec_wrapped_to_vec_fieldref, vec_datatype_to_rvec_wrapped, vec_fieldref_to_rvec_wrapped, }; +use crate::volatility::FFI_Volatility; use crate::{df_result, sresult, sresult_return}; use prost::Message; -use crate::volatility::FFI_Volatility; /// A stable struct for sharing a [`WindowUDF`] across FFI boundaries. #[repr(C)] @@ -76,35 +75,41 @@ pub struct FFI_WindowUDF { ) -> FFI_Result, -pub field: unsafe extern "C" fn( + pub field: unsafe extern "C" fn( udwf: &Self, input_types: SVec, display_name: SString, ) -> FFI_Result, + /// Pointer lifetime is tied to the inner Arc; null = None pub documentation: unsafe extern "C" fn(udwf: &Self) -> *const Documentation, + /// Returns expressions in the same order as input_exprs pub expressions: unsafe extern "C" fn( udwf: &Self, args: FFI_ExpressionArgs, ) -> SVec, + /// Serializes WindowFunction via DefaultLogicalExtensionCodec; + /// returns None variant if no simplification; only called when has_simplify=true pub simplify: unsafe extern "C" fn( udwf: &Self, window_function: SVec, schema: WrappedSchema, ) -> FFI_Result>>, + /// Returns FFI_ReversedUDWF enum; Reversed variant contains a cloned FFI_WindowUDF pub reverse_expr: unsafe extern "C" fn(udwf: &Self) -> FFI_ReversedUDWF, pub coerce_types: unsafe extern "C" fn( - udf: &Self, arg_types: SVec, ) -> FFI_Result>, pub sort_options: FFI_Option, + pub has_simplify: bool, + /// Used to create a clone on the provider of the udf. This should /// only need to be called by the receiver of the udf. pub clone: unsafe extern "C" fn(udf: &Self) -> Self, @@ -175,7 +180,9 @@ unsafe extern "C" fn field_fn_wrapper( } } -unsafe extern "C" fn documentation_fn_wrapper(udwf: &FFI_WindowUDF) -> *const Documentation { +unsafe extern "C" fn documentation_fn_wrapper( + udwf: &FFI_WindowUDF, +) -> *const Documentation { unsafe { let inner = udwf.inner(); match inner.documentation() { @@ -191,9 +198,15 @@ unsafe extern "C" fn expressions_fn_wrapper( ) -> SVec { unsafe { let inner = udwf.inner(); - let args = ForeignExpressionArgs::try_from(args).unwrap(); + let args = match ForeignExpressionArgs::try_from(args) { + Ok(args) => args, + Err(_) => return SVec::new(), + }; let expressions = inner.expressions((&args).into()); - expressions.into_iter().map(FFI_PhysicalExpr::from).collect() + expressions + .into_iter() + .map(FFI_PhysicalExpr::from) + .collect() } } @@ -206,20 +219,18 @@ unsafe extern "C" fn simplify_fn_wrapper( let inner = udwf.inner(); // 1. Deserialize bytes to Expr using Default codec - let protobuf = sresult_return!( - datafusion_proto::protobuf::LogicalExprNode::decode( + let protobuf = + sresult_return!(datafusion_proto::protobuf::LogicalExprNode::decode( window_function_bytes.as_ref() - ) - ); + )); let mut ctx = datafusion_execution::TaskContext::default(); // Register the wrapped UDWF so it can be resolved during deserialization sresult_return!(ctx.register_udwf(Arc::clone(inner))); let codec = datafusion_proto::logical_plan::DefaultLogicalExtensionCodec {}; - let expr = sresult_return!(datafusion_proto::logical_plan::from_proto::parse_expr( - &protobuf, - &ctx, - &codec - )); + let expr = + sresult_return!(datafusion_proto::logical_plan::from_proto::parse_expr( + &protobuf, &ctx, &codec + )); // 2. Extract WindowFunction from Expr let window_function = match expr { @@ -235,7 +246,8 @@ unsafe extern "C" fn simplify_fn_wrapper( // 4. Call inner.simplify() match inner.simplify() { Some(simplify_fn) => { - let simplified_expr = sresult_return!(simplify_fn(*window_function, &info)); + let simplified_expr = + sresult_return!(simplify_fn(*window_function, &info)); let protobuf = sresult_return!( datafusion_proto::logical_plan::to_proto::serialize_expr( &simplified_expr, @@ -312,6 +324,7 @@ unsafe extern "C" fn clone_fn_wrapper(udwf: &FFI_WindowUDF) -> FFI_WindowUDF { volatility: udwf.volatility.clone(), partition_evaluator: partition_evaluator_fn_wrapper, sort_options: udwf.sort_options.clone(), + has_simplify: udwf.has_simplify, coerce_types: coerce_types_fn_wrapper, field: field_fn_wrapper, documentation: documentation_fn_wrapper, @@ -342,6 +355,7 @@ impl From> for FFI_WindowUDF { let aliases = udf.aliases().iter().map(|a| a.to_owned().into()).collect(); let volatility = udf.signature().volatility.into(); let sort_options = udf.sort_options().map(|v| (&v).into()).into(); + let has_simplify = udf.inner().simplify().is_some(); let private_data = Box::new(WindowUDFPrivateData { udf }); @@ -351,6 +365,7 @@ impl From> for FFI_WindowUDF { volatility, partition_evaluator: partition_evaluator_fn_wrapper, sort_options, + has_simplify, coerce_types: coerce_types_fn_wrapper, field: field_fn_wrapper, documentation: documentation_fn_wrapper, @@ -454,7 +469,7 @@ impl WindowUDFImpl for ForeignWindowUDF { df_result!(evaluator).map(>::from) } -fn field(&self, field_args: WindowUDFFieldArgs) -> Result { + fn field(&self, field_args: WindowUDFFieldArgs) -> Result { unsafe { let input_types = vec_fieldref_to_rvec_wrapped(field_args.input_fields())?; let schema = df_result!((self.udf.field)( @@ -485,7 +500,11 @@ fn field(&self, field_args: WindowUDFFieldArgs) -> Result { expr_args: datafusion_expr::function::ExpressionArgs, ) -> Vec> { unsafe { - let args = FFI_ExpressionArgs::try_from(expr_args).unwrap(); + let fallback = expr_args.input_exprs().to_vec(); + let args = match FFI_ExpressionArgs::try_from(expr_args) { + Ok(args) => args, + Err(_) => return fallback, + }; (self.udf.expressions)(&self.udf, args) .into_iter() .map(|e| Arc::::from(&e)) @@ -494,46 +513,50 @@ fn field(&self, field_args: WindowUDFFieldArgs) -> Result { } fn simplify(&self) -> Option { + if !self.udf.has_simplify { + return None; + } + let udf = self.udf.clone(); Some(Box::new(move |wf, info| { let codec = datafusion_proto::logical_plan::DefaultLogicalExtensionCodec {}; - + // To serialize the window function let expr = Expr::WindowFunction(Box::new(wf)); let protobuf = datafusion_proto::logical_plan::to_proto::serialize_expr( - &expr, - &codec - ).map_err(|e| datafusion_common::DataFusionError::Plan(e.to_string()))?; - + &expr, &codec, + ) + .map_err(|e| datafusion_common::DataFusionError::Plan(e.to_string()))?; + let mut buffer = Vec::new(); Message::encode(&protobuf, &mut buffer) .map_err(|e| datafusion_common::DataFusionError::Plan(e.to_string()))?; let schema_ref: SchemaRef = Arc::new(info.schema().as_arrow().clone()); let schema = WrappedSchema::from(schema_ref); - + // Call the FFI function - let result = unsafe { - (udf.simplify)(&udf, buffer.into_iter().collect(), schema) - }; - + let result = + unsafe { (udf.simplify)(&udf, buffer.into_iter().collect(), schema) }; + let result: Option> = crate::df_result!(result)?.into(); - + match result { Some(bytes) => { - let protobuf = datafusion_proto::protobuf::LogicalExprNode::decode(bytes.as_slice()) - .map_err(|e| datafusion_common::DataFusionError::Plan(e.to_string()))?; + let protobuf = datafusion_proto::protobuf::LogicalExprNode::decode( + bytes.as_slice(), + ) + .map_err(|e| { + datafusion_common::DataFusionError::Plan(e.to_string()) + })?; let ctx = datafusion_execution::TaskContext::default(); - let simplified_expr = datafusion_proto::logical_plan::from_proto::parse_expr( - &protobuf, - &ctx, - &codec, - )?; + let simplified_expr = + datafusion_proto::logical_plan::from_proto::parse_expr( + &protobuf, &ctx, &codec, + )?; Ok(simplified_expr) } - None => { - Ok(expr) - } + None => Ok(expr), } })) } @@ -592,7 +615,9 @@ impl From for ReversedUDWF { FFI_ReversedUDWF::NotSupported => ReversedUDWF::NotSupported, FFI_ReversedUDWF::Reversed(ffi_udf) => { let udf_impl: Arc = (&ffi_udf).into(); - ReversedUDWF::Reversed(Arc::new(WindowUDF::new_from_shared_impl(udf_impl))) + ReversedUDWF::Reversed(Arc::new(WindowUDF::new_from_shared_impl( + udf_impl, + ))) } } } @@ -604,13 +629,13 @@ mod tests { use std::sync::Arc; use arrow::array::{ArrayRef, create_array}; + use arrow_schema::FieldRef; use datafusion::functions_window::lead_lag::{WindowShift, lag_udwf}; use datafusion::logical_expr::expr::Sort; use datafusion::logical_expr::{ExprFunctionExt, WindowUDF, WindowUDFImpl, col}; use datafusion::prelude::SessionContext; - use datafusion_expr::{Signature, PartitionEvaluator}; use datafusion_expr::function::WindowUDFFieldArgs; - use arrow_schema::FieldRef; + use datafusion_expr::{PartitionEvaluator, Signature}; use crate::tests::create_record_batch; use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF}; @@ -693,7 +718,9 @@ mod tests { #[test] fn test_ffi_udwf_documentation() -> datafusion_common::Result<()> { - use datafusion_expr::{DocSection, Documentation, Volatility, function::PartitionEvaluatorArgs}; + use datafusion_expr::{ + DocSection, Documentation, Volatility, function::PartitionEvaluatorArgs, + }; #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct MockUDWFWithDoc { @@ -702,14 +729,31 @@ mod tests { } impl WindowUDFImpl for MockUDWFWithDoc { - fn name(&self) -> &str { "mock_doc" } - fn signature(&self) -> &Signature { &self.signature } - fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> datafusion_common::Result> { unimplemented!() } - fn field(&self, _: WindowUDFFieldArgs) -> datafusion_common::Result { unimplemented!() } - fn documentation(&self) -> Option<&Documentation> { Some(&self.doc) } + fn name(&self) -> &str { + "mock_doc" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: PartitionEvaluatorArgs, + ) -> datafusion_common::Result> { + unimplemented!() + } + fn field( + &self, + _: WindowUDFFieldArgs, + ) -> datafusion_common::Result { + unimplemented!() + } + fn documentation(&self) -> Option<&Documentation> { + Some(&self.doc) + } } - let doc = Documentation::builder(DocSection::default(), "description", "syntax").build(); + let doc = Documentation::builder(DocSection::default(), "description", "syntax") + .build(); let original_udwf = Arc::new(WindowUDF::from(MockUDWFWithDoc { signature: Signature::any(0, Volatility::Immutable), doc: doc.clone(), @@ -725,7 +769,10 @@ mod tests { #[test] fn test_ffi_udwf_expressions() -> datafusion_common::Result<()> { use arrow::datatypes::DataType; - use datafusion_expr::{Volatility, function::{ExpressionArgs, PartitionEvaluatorArgs}}; + use datafusion_expr::{ + Volatility, + function::{ExpressionArgs, PartitionEvaluatorArgs}, + }; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::expressions::col; @@ -735,10 +782,24 @@ mod tests { } impl WindowUDFImpl for MockUDWFWithExprs { - fn name(&self) -> &str { "mock_exprs" } - fn signature(&self) -> &Signature { &self.signature } - fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> datafusion_common::Result> { unimplemented!() } - fn field(&self, _: WindowUDFFieldArgs) -> datafusion_common::Result { unimplemented!() } + fn name(&self) -> &str { + "mock_exprs" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: PartitionEvaluatorArgs, + ) -> datafusion_common::Result> { + unimplemented!() + } + fn field( + &self, + _: WindowUDFFieldArgs, + ) -> datafusion_common::Result { + unimplemented!() + } fn expressions(&self, args: ExpressionArgs) -> Vec> { args.input_exprs().iter().rev().cloned().collect() } @@ -762,7 +823,7 @@ mod tests { let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; let foreign_udwf: Arc = (&ffi_udwf).into(); - + let input_exprs = [expr_a, expr_b]; let args = ExpressionArgs::new(&input_exprs, &fields); let result = foreign_udwf.expressions(args); @@ -774,7 +835,11 @@ mod tests { #[test] fn test_ffi_udwf_simplify() -> datafusion_common::Result<()> { - use datafusion_expr::{Volatility, function::{PartitionEvaluatorArgs, WindowFunctionSimplification}, lit}; + use datafusion_expr::{ + Volatility, + function::{PartitionEvaluatorArgs, WindowFunctionSimplification}, + lit, + }; #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct MockUDWFSimplify { @@ -782,10 +847,24 @@ mod tests { } impl WindowUDFImpl for MockUDWFSimplify { - fn name(&self) -> &str { "mock_simplify" } - fn signature(&self) -> &Signature { &self.signature } - fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> datafusion_common::Result> { unimplemented!() } - fn field(&self, _: WindowUDFFieldArgs) -> datafusion_common::Result { unimplemented!() } + fn name(&self) -> &str { + "mock_simplify" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: PartitionEvaluatorArgs, + ) -> datafusion_common::Result> { + unimplemented!() + } + fn field( + &self, + _: WindowUDFFieldArgs, + ) -> datafusion_common::Result { + unimplemented!() + } fn simplify(&self) -> Option { Some(Box::new(|_, _| Ok(lit(1)))) } @@ -796,10 +875,10 @@ mod tests { })); let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; - + let foreign_udwf: Arc = (&ffi_udwf).into(); let simplify_fn = foreign_udwf.simplify().unwrap(); - + let wf = datafusion_expr::expr::WindowFunction { fun: datafusion_expr::WindowFunctionDefinition::WindowUDF(original_udwf), params: datafusion_expr::expr::WindowFunctionParams { @@ -812,10 +891,12 @@ mod tests { distinct: false, }, }; - + let schema = arrow::datatypes::Schema::empty(); let df_schema = datafusion_common::DFSchema::try_from(schema).unwrap(); - let info = datafusion_expr::simplify::SimplifyContext::builder().with_schema(Arc::new(df_schema)).build(); + let info = datafusion_expr::simplify::SimplifyContext::builder() + .with_schema(Arc::new(df_schema)) + .build(); let simplified_expr = simplify_fn(wf, &info).unwrap(); assert_eq!(simplified_expr, lit(1)); @@ -825,7 +906,9 @@ mod tests { #[test] fn test_ffi_udwf_reverse_expr() -> datafusion_common::Result<()> { - use datafusion_expr::{ReversedUDWF, Volatility, function::PartitionEvaluatorArgs}; + use datafusion_expr::{ + ReversedUDWF, Volatility, function::PartitionEvaluatorArgs, + }; #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct MockUDWFReverse { @@ -833,11 +916,27 @@ mod tests { } impl WindowUDFImpl for MockUDWFReverse { - fn name(&self) -> &str { "mock_reverse" } - fn signature(&self) -> &Signature { &self.signature } - fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> datafusion_common::Result> { unimplemented!() } - fn field(&self, _: WindowUDFFieldArgs) -> datafusion_common::Result { unimplemented!() } - fn reverse_expr(&self) -> ReversedUDWF { ReversedUDWF::Identical } + fn name(&self) -> &str { + "mock_reverse" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: PartitionEvaluatorArgs, + ) -> datafusion_common::Result> { + unimplemented!() + } + fn field( + &self, + _: WindowUDFFieldArgs, + ) -> datafusion_common::Result { + unimplemented!() + } + fn reverse_expr(&self) -> ReversedUDWF { + ReversedUDWF::Identical + } } let original_udwf = Arc::new(WindowUDF::from(MockUDWFReverse { @@ -847,14 +946,19 @@ mod tests { let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; let foreign_udwf: Arc = (&ffi_udwf).into(); - assert!(matches!(foreign_udwf.reverse_expr(), ReversedUDWF::Identical)); + assert!(matches!( + foreign_udwf.reverse_expr(), + ReversedUDWF::Identical + )); Ok(()) } #[test] fn test_ffi_udwf_reverse_expr_recursive() -> datafusion_common::Result<()> { - use datafusion_expr::{ReversedUDWF, Volatility, function::PartitionEvaluatorArgs}; + use datafusion_expr::{ + ReversedUDWF, Volatility, function::PartitionEvaluatorArgs, + }; #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct MockUDWFRecursive { @@ -863,11 +967,27 @@ mod tests { } impl WindowUDFImpl for MockUDWFRecursive { - fn name(&self) -> &str { "mock_recursive" } - fn signature(&self) -> &Signature { &self.signature } - fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> datafusion_common::Result> { unimplemented!() } - fn field(&self, _: WindowUDFFieldArgs) -> datafusion_common::Result { unimplemented!() } - fn reverse_expr(&self) -> ReversedUDWF { ReversedUDWF::Reversed(Arc::clone(&self.reversed)) } + fn name(&self) -> &str { + "mock_recursive" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: PartitionEvaluatorArgs, + ) -> datafusion_common::Result> { + unimplemented!() + } + fn field( + &self, + _: WindowUDFFieldArgs, + ) -> datafusion_common::Result { + unimplemented!() + } + fn reverse_expr(&self) -> ReversedUDWF { + ReversedUDWF::Reversed(Arc::clone(&self.reversed)) + } } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -875,10 +995,24 @@ mod tests { signature: Signature, } impl WindowUDFImpl for MockUDWFSimple { - fn name(&self) -> &str { "mock_simple" } - fn signature(&self) -> &Signature { &self.signature } - fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> datafusion_common::Result> { unimplemented!() } - fn field(&self, _: WindowUDFFieldArgs) -> datafusion_common::Result { unimplemented!() } + fn name(&self) -> &str { + "mock_simple" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: PartitionEvaluatorArgs, + ) -> datafusion_common::Result> { + unimplemented!() + } + fn field( + &self, + _: WindowUDFFieldArgs, + ) -> datafusion_common::Result { + unimplemented!() + } } let reversed = Arc::new(WindowUDF::from(MockUDWFSimple { diff --git a/datafusion/ffi/tests/ffi_udwf.rs b/datafusion/ffi/tests/ffi_udwf.rs index 145a558208ff1..8ea3cbe7541b4 100644 --- a/datafusion/ffi/tests/ffi_udwf.rs +++ b/datafusion/ffi/tests/ffi_udwf.rs @@ -82,7 +82,8 @@ mod tests { fn partition_evaluator( &self, _: datafusion_expr::function::PartitionEvaluatorArgs, - ) -> Result> { + ) -> Result> + { unimplemented!() } fn field( @@ -96,12 +97,8 @@ mod tests { } } - let doc = Documentation::builder( - DocSection::default(), - "description", - "syntax", - ) - .build(); + let doc = Documentation::builder(DocSection::default(), "description", "syntax") + .build(); let original_udwf = Arc::new(WindowUDF::from(MockUDWFWithDoc { signature: datafusion::logical_expr::Signature::any( 0, From 027ea4b6a4ee1e10d5e0d3009bf062a338bc8a5f Mon Sep 17 00:00:00 2001 From: Brijesh-Thakkar Date: Tue, 2 Jun 2026 18:23:14 +0530 Subject: [PATCH 3/4] style(ffi): remove trailing whitespace in FFI_WindowUDF doc comment --- datafusion/ffi/src/udwf/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index fcb2ee25369f9..98790cf70511d 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -90,7 +90,7 @@ pub struct FFI_WindowUDF { args: FFI_ExpressionArgs, ) -> SVec, - /// Serializes WindowFunction via DefaultLogicalExtensionCodec; + /// Serializes WindowFunction via DefaultLogicalExtensionCodec; /// returns None variant if no simplification; only called when has_simplify=true pub simplify: unsafe extern "C" fn( udwf: &Self, From 9a24ce5ca8b8bf339fdd55ae1ed8b2420f559425 Mon Sep 17 00:00:00 2001 From: Brijesh-Thakkar Date: Wed, 3 Jun 2026 22:40:12 +0530 Subject: [PATCH 4/4] ci: fix docker image pull failure by using rust instead of amd64/rust --- .github/workflows/dependencies.yml | 4 +-- .github/workflows/extended.yml | 4 +-- .github/workflows/rust.yml | 40 +++++++++++++++--------------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/.github/workflows/dependencies.yml b/.github/workflows/dependencies.yml index 2f3a127ef98c4..d53d7ce9bb5d9 100644 --- a/.github/workflows/dependencies.yml +++ b/.github/workflows/dependencies.yml @@ -39,7 +39,7 @@ jobs: name: Circular Dependency Check runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: @@ -58,7 +58,7 @@ jobs: name: Detect Unused Dependencies runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Install cargo-machete diff --git a/.github/workflows/extended.yml b/.github/workflows/extended.yml index a143cb49fd35b..89719db0bb846 100644 --- a/.github/workflows/extended.yml +++ b/.github/workflows/extended.yml @@ -108,7 +108,7 @@ jobs: name: cargo test hash collisions (amd64) runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -130,7 +130,7 @@ jobs: name: "Run sqllogictests with the sqlite test suite" runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=32,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f167117d5d146..dc4dd51569325 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -48,7 +48,7 @@ jobs: name: linux build test runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=8,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -77,7 +77,7 @@ jobs: needs: linux-build-lib runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Setup Rust toolchain @@ -102,7 +102,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Setup Rust toolchain @@ -139,7 +139,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -171,7 +171,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -237,7 +237,7 @@ jobs: needs: linux-build-lib runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Setup Rust toolchain @@ -272,7 +272,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust volumes: - /usr/local:/host/usr/local steps: @@ -352,7 +352,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -383,7 +383,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -405,7 +405,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -446,7 +446,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -480,7 +480,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust services: postgres: image: postgres:15 @@ -519,7 +519,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -579,7 +579,7 @@ jobs: name: Verify Vendored Code runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Setup Rust toolchain @@ -596,7 +596,7 @@ jobs: name: Check cargo fmt runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Setup Rust toolchain @@ -654,7 +654,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -680,7 +680,7 @@ jobs: needs: linux-build-lib runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: @@ -701,7 +701,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -736,7 +736,7 @@ jobs: needs: linux-build-lib runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -767,7 +767,7 @@ jobs: name: Verify MSRV (Min Supported Rust Version) runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Setup Rust toolchain