fix: reject incompatible decimal precision/scale in native_datafusion scan#4090
fix: reject incompatible decimal precision/scale in native_datafusion scan#4090andygrove wants to merge 1 commit intoapache:mainfrom
Conversation
… scan The native_datafusion Spark physical expression adapter previously fell through to a Spark Cast for decimal-to-decimal type changes, which silently rescales or truncates values that should have raised an error. Mirror Spark's TypeUtil.isDecimalTypeMatched (Spark 3.x rule) by rejecting reads where the target precision is smaller than the source precision or the scales differ. Closes apache#4089.
1194d82 to
99e1235
Compare
|
In a case where we expect an exception to be generated anyway, can we catch this at CometScanRule rather than going all the way to serialization and native operators? |
I think the issue is that we do not know the types of all the parquet files until runtime? |
IIRC from looking at this a while back, Spark has read the physical schema already, but thrown it away by the time our Comet rules run with no good way to get it again. I'm not opposed to handling it this way, just wanted to think through if we could catch it earlier. It's also a fairly uncommon scenario. |
I do think this is an edge case that is fairly unlikely IRL because it only happens when the user provides a schema that is incompatible with the file schema |
Which issue does this PR close?
Closes #4089.
Rationale for this change
When the
native_datafusionscan reads a Parquet column whose physical type isDecimal(p1, s1)under a requested read schema ofDecimal(p2, s2)withs2 < s1, the existing schema adapter falls through to Spark'sCastexpression.Casthappily truncates fractional digits, producing wrong values silently. Spark's vectorized reader rejects this withSchemaColumnConvertNotSupportedException, andnative_iceberg_compatalready does the same viaTypeUtil.checkParquetType. The native scan should match.What changes are included in this PR?
native/core/src/parquet/schema_adapter.rs: inreplace_with_spark_cast, add a guard before the existing branches that returnsDataFusionError::Planwhen bothphysical_typeandtarget_typeareDecimal128and the target scale is smaller than the source scale.The check is intentionally narrow:
Decimal(5,2)read asDecimal(3,2)) are still allowed and fall through to Spark'sCast, which produces null on per-value overflow. This matches Spark 4.0's parquet-mr fallback behavior exercised byParquetTypeWideningSuite'sparquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mrtest.How are these changes tested?
Added a focused test to
ParquetReadSuite:native_datafusion rejects incompatible decimal precision/scale. It writesDecimal(10, 2)data, reads it underDecimal(5, 0)(scale narrowed from 2 to 0), forcesspark.comet.scan.impl=native_datafusionandspark.sql.sources.useV1SourceList=parquet, and assertscollect()raisesSparkException. Verified againstParquetReadV1Suite(44 tests, all pass; 1 pre-existing test ignored).The behavior is also covered by the per-impl matrix added in #4087 (
decimal(10,2) read as decimal(5,0): native_datafusion), whose assertion will need flipping from "succeeds" to "throws" once that PR merges.