Support lower and upper scalar udf on dict arrays#22647
Conversation
Jefffrey
left a comment
There was a problem hiding this comment.
Can we have an SLT case to prove it works end to end (to ensure it works with the signature code too)
@Jefffrey if I'm not mistaken, we already have SLT tests for this: datafusion/datafusion/sqllogictest/test_files/functions.slt Lines 513 to 516 in 00c35d0 datafusion/datafusion/sqllogictest/test_files/functions.slt Lines 433 to 436 in 00c35d0 Invoking these functions doesn't fail when we invoke them on dictionary columns using the sql API. For example: use arrow::{
array::{DictionaryArray, RecordBatch, StringArray, UInt8Array},
util::pretty::print_batches,
};
use arrow_schema::{DataType, Field, Schema};
use datafusion::{catalog::MemTable, functions::expr_fn::lower, prelude::SessionContext};
use datafusion::{logical_expr::col, physical_expr::create_physical_expr};
use datafusion_common::DFSchema;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new(
"b",
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
false,
),
]));
let rb = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from_iter_values(["a", "aa", "b"])),
Arc::new(DictionaryArray::new(
UInt8Array::from_iter_values([0, 1, 0]),
Arc::new(StringArray::from_iter_values(["a", "B"])),
)),
],
)
.unwrap();
// -- Invoking through SQL API works ---
let ctx = SessionContext::new();
ctx.register_table(
"tab1",
Arc::new(MemTable::try_new(schema.clone(), vec![vec![rb.clone()]]).unwrap()),
)
.unwrap();
let df = ctx.sql("select lower(b) from tab1").await.unwrap();
let batches = df.collect().await.unwrap();
print_batches(&batches).unwrap();
// --- Invoking through expr API, does not work ---
let physical_expr = create_physical_expr(
&lower(col("b")),
&DFSchema::try_from(schema).unwrap(),
&ctx.state().execution_props(),
)
.unwrap();
let expr_result = physical_expr.evaluate(&rb).unwrap(); // panics
println!("{:?}", expr_result);
}I think this is because when planning, we coerce the field to Utf8View. The |
|
The SLTs should at least check the dictionary type is preserved (e.g. check via |
Maybe I should just create a new issue instead of linking to #20935. My intention here wasn't actually to preserve dictionary encoding necessarily. This issue just seemed like the most relevant one that was already open. My real intention here was just to stop these functions returning an error when invoked with a Dictionary arg. Would you find that acceptable @Jefffrey? |
|
I don't understand what this PR is fixing in that case, considering you've linked to some existing SLTs which call these functions with dictionary arrays? |
@Jefffrey I linked the SLT's because you asked about adding them in this comment #22647 (review) ... What this issue is fixing is: if these Scalar UDFs are invoked directly using an expression (e.g. not using SQL*), they will return an error if the argument is a dictionary let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new(
"b",
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
false,
),
]));
let rb = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from_iter_values(["a", "aa", "b"])),
Arc::new(DictionaryArray::new(
UInt8Array::from_iter_values([0, 1, 0]),
Arc::new(StringArray::from_iter_values(["a", "B"])),
)),
],
)
.unwrap();
let physical_expr = create_physical_expr(
&lower(col("b")),
&DFSchema::try_from(schema).unwrap(),
&ctx.state().execution_props(),
)
.unwrap();
let expr_result = physical_expr.evaluate(&rb).unwrap(); // does not succeed
println!("{:?}", expr_result);(see full example here) |
|
Is it a common use case to be constructing the physical expression directly without going through the logical optimizer which handles signature coercion? |
I can't speak to how common it is generally -- but this is what what is done in the codebase I work on. Currently we are working around this casting the dictionary arrays to a UTF-8 array before invoking the expression. We'd like to avoid doing this unnecessary conversion. With the changes introduced in this PR, I imagine you'd also be able to avoid the cast added by the optimizer's coercsion, which maybe you find interesting from a performance perspective as well. |
What I'm trying to highlight is this PR only adds this path for physical execution; meaning it is unreachable via normal SQL execution which goes through the optimizer path because the signature of the UDFs still coerce away from dictionary (hence why I wanted an end-to-end SLT example to prove this works). Also, taking another look, this physical execution path is implemented only for the array path and misses out the scalar path. |
Thanks @Jefffrey, that helps clarify things quite a bit. Sorry for confusion. I feel we were thinking about this change from slightly different perspectives. For my use-case, I only needed this corrected for physical expr evaluation, so I wasn't thinking about this from the SQL execution perspective. I can make the changes to have this path invoked for SQL execution as well, as you've suggested. Do you know off-hand if there are currently any UDFs with coercible signatures that accept dictionary arrays? I browsed briefly and didn't see any (maybe I'm missing something). I'm thinking that maybe we'll need to add some new capabilities to the logical type or type signature module to get this working. Currently, we're using And the only implementation we have of this is for NativeTypeMy tentative plan would be to add create a implementation of |
Which issue does this PR close?
(if we need a dedicated issue for this PR specifically, happy to create one)
Rationale for this change
Have the
lowerandupperscalar UDFs capable of converting the case of dictionary arrays.Before this change, callers of these scalar UDFs would have to convert their columns to non-dictionary encoded arrays, where they may wish to avoid the overhead.
What changes are included in this PR?
Calls the
case_conversionfunction recursively on dictionary values if the input is a dictionary array.Are these changes tested?
Yes unit tests
Are there any user-facing changes?
No