-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat: add DataFrame fill_nan #22702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: add DataFrame fill_nan #22702
Changes from all commits
3b9c184
be9e1d4
6fe477a
28b1e72
b5eaa14
b70c962
eb79a11
cb13967
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,13 +58,11 @@ use datafusion_common::{ | |
| }; | ||
| use datafusion_expr::select_expr::SelectExpr; | ||
| use datafusion_expr::{ | ||
| ExplainOption, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE, case, | ||
| dml::InsertOp, | ||
| expr::{Alias, ScalarFunction}, | ||
| is_null, lit, | ||
| utils::COUNT_STAR_EXPANSION, | ||
| ExplainOption, ScalarUDF, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE, case, | ||
| dml::InsertOp, is_null, lit, utils::COUNT_STAR_EXPANSION, | ||
| }; | ||
| use datafusion_functions::core::coalesce; | ||
| use datafusion_functions::math::nanvl; | ||
| use datafusion_functions_aggregate::expr_fn::{ | ||
| avg, count, max, median, min, stddev, sum, | ||
| }; | ||
|
|
@@ -2471,6 +2469,65 @@ impl DataFrame { | |
| &self, | ||
| value: ScalarValue, | ||
| columns: Vec<String>, | ||
| ) -> Result<DataFrame> { | ||
| self.fill_columns(&value, &columns, &coalesce(), |_| true) | ||
| } | ||
|
|
||
| // Helper to find columns from names | ||
| fn find_columns(&self, names: &[impl AsRef<str>]) -> Result<Vec<FieldRef>> { | ||
| let schema = self.logical_plan().schema(); | ||
| names | ||
| .iter() | ||
| .map(|name| { | ||
| let name = name.as_ref(); | ||
| schema | ||
| .field_with_name(None, name) | ||
| .cloned() | ||
| .map_err(|_| plan_datafusion_err!("Column '{}' not found", name)) | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| /// Fill NaN values in specified floating-point columns with a given value | ||
| /// If no columns are specified (empty slice), applies to all columns | ||
| /// Only floating-point columns are affected; other columns are left unchanged | ||
| /// Only fills if the value can be cast to the column's type | ||
| /// | ||
| /// # Arguments | ||
| /// * `value` - Value to fill NaNs with | ||
| /// * `columns` - List of column names to fill. If empty, fills all columns. | ||
| /// | ||
| /// # Example | ||
| /// ``` | ||
| /// # use datafusion::prelude::*; | ||
| /// # use datafusion::error::Result; | ||
| /// # use datafusion_common::ScalarValue; | ||
| /// # #[tokio::main] | ||
| /// # async fn main() -> Result<()> { | ||
| /// let ctx = SessionContext::new(); | ||
| /// let df = ctx | ||
| /// .read_csv("tests/data/example.csv", CsvReadOptions::new()) | ||
| /// .await?; | ||
| /// // Fill NaN in only columns "a" and "c": | ||
| /// let df = df.fill_nan(ScalarValue::from(0.0), &["a", "c"])?; | ||
| /// // Fill NaN across all columns: | ||
| /// let df = df.fill_nan(ScalarValue::from(0.0), &[])?; | ||
| /// # Ok(()) | ||
| /// # } | ||
| /// ``` | ||
| #[expect(clippy::needless_pass_by_value)] | ||
| pub fn fill_nan(&self, value: ScalarValue, columns: &[&str]) -> Result<DataFrame> { | ||
| self.fill_columns(&value, columns, &nanvl(), |field| { | ||
| field.data_type().is_floating() | ||
| }) | ||
| } | ||
|
|
||
| fn fill_columns( | ||
| &self, | ||
| value: &ScalarValue, | ||
| columns: &[impl AsRef<str>], | ||
| func: &Arc<ScalarUDF>, | ||
| applies: impl Fn(&FieldRef) -> bool, | ||
| ) -> Result<DataFrame> { | ||
| let cols = if columns.is_empty() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks review. Done in 6fe477a. |
||
| self.logical_plan() | ||
|
|
@@ -2480,28 +2537,21 @@ impl DataFrame { | |
| .map(Arc::clone) | ||
| .collect() | ||
| } else { | ||
| self.find_columns(&columns)? | ||
| self.find_columns(columns)? | ||
| }; | ||
|
|
||
| // Create projections for each column | ||
| let projections = self | ||
| .logical_plan() | ||
| .schema() | ||
| .fields() | ||
| .iter() | ||
| .map(|field| { | ||
| if cols.contains(field) { | ||
| if cols.contains(field) && applies(field) { | ||
| // Try to cast fill value to column type. If the cast fails, fallback to the original column. | ||
| match value.clone().cast_to(field.data_type()) { | ||
| Ok(fill_value) => Expr::Alias(Alias { | ||
| expr: Box::new(Expr::ScalarFunction(ScalarFunction { | ||
| func: coalesce(), | ||
| args: vec![col(field.name()), lit(fill_value)], | ||
| })), | ||
| relation: None, | ||
| name: field.name().to_string(), | ||
| metadata: None, | ||
| }), | ||
| Ok(fill_value) => func | ||
| .call(vec![col(field.name()), lit(fill_value)]) | ||
| .alias(field.name()), | ||
| Err(_) => col(field.name()), | ||
| } | ||
| } else { | ||
|
|
@@ -2513,20 +2563,6 @@ impl DataFrame { | |
| self.clone().select(projections) | ||
|
|
||
| } | ||
|
|
||
| // Helper to find columns from names | ||
| fn find_columns(&self, names: &[String]) -> Result<Vec<FieldRef>> { | ||
| let schema = self.logical_plan().schema(); | ||
| names | ||
| .iter() | ||
| .map(|name| { | ||
| schema | ||
| .field_with_name(None, name) | ||
| .cloned() | ||
| .map_err(|_| plan_datafusion_err!("Column '{}' not found", name)) | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| /// Find qualified columns for this dataframe from names | ||
| /// | ||
| /// # Arguments | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we switch columns to something more like
&[&str]]or even&[impl Into<Column>], like the other functions?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks review. I modeled
fill_nanon the existing fill_null, which also takes Vec, so changing justfill_nanwould leave the two siblings inconsistent.I agree with aligning better, but changing fill_null's signature is a breaking(e.g.
df.fill_null(val, vec!["a".to_string()])). So maybe we not bundle it into this PR? Could keep fill_nan matching fill_null here and migrate both in a follow-up?Also unnest_columns uses
&[&str]while drop_columns uses&[impl Into<Column>]. Do we have any preference which to standardize on?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer trying to get the API right from the start, since as you point out changing them later can be a breaking change (technically we can avoid if we do the followup within the same release window)
Requiring a
Vec<String>is a bit unwieldy, which I think is what the clippy lint is trying to tell us?I think it would be good to try get
&[impl Into<Column>]to work if possible since that allows&[&str]to work as well.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I'll update fill_nan to
&[impl Into<Column>]For
fill_null, it's been public. So aligning its signature is a genuine breaking. I'll open a separate PR for it with the api-change label and an upgrade-guide note. Happy to aim for the same release.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I use
&[&str]instead of&[impl Into<Column>].The blocker was the "all columns" case with the generic signature an empty slice can't infer its type:
Making callers annotate &[] for the common case didn't seem worth it just to accept Into. &[&str] it is both cases stay clean:
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this works, thanks