perf(upsert): prune destination scan via df partition-column ranges a…#3387
perf(upsert): prune destination scan via df partition-column ranges a…#3387paultmathew wants to merge 1 commit into
Conversation
…nd project join_cols only Two complementary optimizations to ``Transaction.upsert`` for tables whose partition spec sources from columns NOT in ``join_cols`` (a common pattern for append-only event logs partitioned by time but keyed by composite IDs): 1. Partition-range augmentation: ``upsert_util.augment_filter_with_partition_ranges`` derives ``[min, max]`` predicates from ``df`` for every partition source column present in the frame and ANDs them into the row filter built by ``create_match_filter``. ``inclusive_projection`` then projects each range through the partition transform at scan plan time, enabling manifest- and file-level pruning that the key-only filter can't trigger. 2. Column-projection for the insert-only path: when ``when_matched_update_all=False`` the consumer loop only reads ``join_cols`` off each destination batch. Passing ``selected_fields=tuple(join_cols)`` to ``DataScan`` lets the parquet reader prune wide non-key columns. The existing ``_projected_field_ids`` auto-union with row-filter columns keeps the partition-range predicate's data accessible. Correctness guards skip the augmentation per-column when the source column is absent from df, entirely null, or partially null (a non-null range predicate would exclude NULL-partition destination rows whose keys may collide with the null-partition source rows). Related to apache#2138, apache#2159, apache#3129. Complementary to (closed-stale) apache#2943's "coarse match filter" approach: that PR shrinks the row predicate itself; this one adds partition pruning the row predicate can't trigger on its own. Co-authored-by: Cursor <cursoragent@cursor.com>
abnobdoss
left a comment
There was a problem hiding this comment.
Hey Paul, this looks very interesting! I've faced similar challenges around iceberg upsert performance in Python so I'm very keen to make some traction clearing out some of the bottlenecks!
That said, I'm worried this approach treats the partition column as an implicit join key, even when it isn't part of join_cols - and in the current version that doesn't seem to be safe. As the scenario on test_upsert.py:1158 shows: if a target row has the same order_id as a source row but a different order_date (status correction, late edit, versioned overwrite), the augmentation prunes the target's file out of the scan and the upsert emits a duplicate insert instead of an update. The augmentation effectively requires the partition column to be stable for a given join key - a contract the upsert API doesn't surface anywhere today.
If this does land, I think the API contract change should be opt-in (a new flag) rather than a default behavior change, to avoid silently breaking existing users. Let me know if I'm missing something.
| [ | ||
| {"order_id": 1, "order_date": datetime.date(2026, 1, 1), "order_type": "A"}, | ||
| {"order_id": 2, "order_date": datetime.date(2026, 1, 1), "order_type": "A"}, | ||
| {"order_id": 3, "order_date": datetime.date(2026, 1, 2), "order_type": "A"}, |
There was a problem hiding this comment.
Does this still pass if this target row keeps order_id=3 but changes order_date to datetime.date(2026, 5, 1)?
The quirk with this change is that it seems to assume the partition column is part of the row identity. I’m not sure when that’s valid outside cases where the partition is derived from the join key, and in those cases I’d expect the join-key filter to already be sufficient for pruning.
|
@abnobdoss Thanks for the review. I agree that this would break things for existing users. I'll update the PR to gate the augmentation on a structural check: augment only when every partition source column is in join_cols (directly or via a deterministic transform on a join column). When that holds, the partition value is a function of the join key, so the augmentation can never exclude a destination file holding the matching key — correctness is preserved by construction. When it doesn't, the row filter is left unchanged and tx.upsert falls back to today's full-table-scan behaviour. The optimisation still covers the workload that motivated the PR — high-cardinality state tables with bucket(N, key) + unique_keys=[key], where partition source ⊆ join_cols by construction. I'll swap the smoke-test scenario to that shape so the benchmarks reflect the case the augmentation is actually safe for. The underlying limitation — "find the row with key=X regardless of which partition holds it" — fundamentally requires either a full-table scan (today's tx.upsert behaviour without augmentation) or a delete-by-value primitive. Equality deletes by primary key are the only Iceberg-native mechanism I see that would let pyiceberg do partition-migration-correct upserts without scanning every file. I know the discussion at #3270 leans toward "don't write equality deletes, write deletion vectors instead", but deletion vectors are still position-keyed — they reduce the rewrite footprint once you've located a row, but they don't help with the lookup itself. So for the partition-migration class of upsert bug specifically, deletion vectors aren't the right tool. @Fokko @kevinjqliu I take the maintainer concerns about equality-delete writes (compaction obligation, read-side merge cost, the rewrite-equality-to-position story) — but I think for the upsert workload the trade-off goes the other way. Happy to take a stab at an equality-delete write path in a separate PR, scoped initially to the cases tx.upsert could opt into, if there's appetite for revisiting that direction. |
|
Hey @paultmathew, thanks for the clarification. I’m curious whether your use case still has performance problems when the partition source column is included in My assumption is that in that case |
|
@abnobdoss You're right. My goal was to replace the Or(And(EqualTo, ...)) predicate with an O(1) range so the projection visitor would do less work, but I didn't account for the fact that inclusive_projection walks the full N-disjunct tree regardless of what I AND onto it — so the augmentation adds work rather than substituting for it, and create_match_filter's own per-disjunct projection is already producing the prune target. I'll drop the partition augmentation and keep just the selected_fields=tuple(join_cols) change. The column-projection win is independent of partition spec and stands on its own. The underlying gap that motivated the augmentation — large flushes against partition-source-not-in-join-cols tables hitting full-table scans — is a separate problem that needs a different primitive. Equality deletes by primary key are the cleanest Iceberg-native fit for it, so I'll follow up on that thread. Will trim the diff and update the PR description shortly. |
Rationale for this change
Transaction.upsertbuilds its scanrow_filterfromjoin_colsalone via
create_match_filter. When the partition spec sources fromcolumns NOT in
join_cols— a common pattern for append-only eventlogs partitioned by time but keyed by composite IDs — two amplifying
problems fall out at scan plan time:
inclusive_projectioncollapses the predicate toAlwaysTrueagainst the partition spec, so partition pruning never fires and
every file in the table is listed (related: Upsertion memory usage grows exponentially as table size grows #2138, Upserting large table extremely slow #2159, Upsert with 1M rows extremely slow due to
create_match_filterandtxn.delete()performance #3129).Or(And(EqualTo, EqualTo), …)predicate on UUID-shaped key columns can't prune either —
per-file
lower_bound/upper_boundstats span essentially thefull UUID space, so the metrics evaluator passes every file.
The result is a full-table scan at every upsert. For tables with 10k+
partitions this is multi-minute / multi-gigabyte work per call.
What this PR does
Two complementary optimisations to
Transaction.upsert:Partition-range augmentation. New helper
upsert_util.augment_filter_with_partition_rangesderives[min, max]predicates fromdffor every partition source columnpresent in the frame and ANDs them into the row filter.
inclusive_projectionthen projects each range through itspartition transform (
hours,days,months,years,identity,truncate) at scan-plan time, enabling manifest- and file-levelpruning.
Column-projection for the insert-only path. When
when_matched_update_all=Falsethe consumer loop only readsjoin_colsoff each destination batch (to build the per-batchsource-side match filter). Passing
selected_fields=tuple(join_cols)to
DataScanlets the parquet reader prune wide non-key columns.The existing
_projected_field_ids.union(extract_field_ids(...))keeps the partition-range predicate's columns readable.
Correctness guards
The augmentation skips per-column in three cases:
df(no bound to derive).df(nomin/max).df— a non-nullGreaterThanOrEqualpredicate would exclude NULL-partition destination rows whose
(join_cols)may collide with null-partition source rows. Skippruning over emitting an unsafe predicate.
When
min == max, anEqualTois emitted instead of the range pair.Multiple partition fields sourcing from the same column emit one
source-column range;
inclusive_projectionprojects through eachpartition field independently at scan time. Bucket and other
non-monotonic transforms return
Nonefrom theirprojectmethod oninequalities — the projection contributes
AlwaysTruefor them, noharm.
Are these changes tested?
Yes:
tests/table/test_upsert.py:augment_filter_with_partition_ranges(unpartitioned, missing column, all-null, partial-null,
single-value, range, multi-field-sharing-source).
join_cols, INjoin_cols, and unpartitioned.DataScan.plan_files()count assertion against a deterministicallyseeded table that defeats per-file metrics pruning — confirms the
augmentation prunes vs the original predicate.
selected_fieldsprojection assertions for bothwhen_matched_update_all=True(legacy('*',)) and=False(narrow
join_cols-only).when_matched_update_all=Trueagainst awide table to confirm column projection doesn't break the update
path.
Smoke test — real Iceberg-on-S3 + Glue table
Run against a real Iceberg table representative of the workload this
optimisation targets.
Stack
pyiceberg.catalog.glue.GlueCatalogTarget table
unique_keys:[conversation_id, id](composite UUID/string key)partition_spec:hours(created_at)conversation_id(string, UUID v4) — keyid(string, UUID v4) — keyevent(string, short tag, ~10 B/row)log(string, JSON payload, ~400 B/row median)created_at(timestamp[us, UTC]) — partition sourceversion(int32)Source synthesis (two modes for the comparison):
synthetic: random UUIDs;conversation_iddrawn from a pool sizedrows/30so leading-key cardinality matches realistic parent-childdistribution;
created_atuniformly in[now − hours, now]. Keysdon't overlap destination → metrics evaluator rejects every file at
scan-plan time, so both projections return 0 files. Isolates the
planning cost.
from-destination: reads N recent rows from the destination via acreated_atrange scan, used as the source. Keys DO overlap →exercises the file-count reduction and column-read effect.
Results (read-only via
DataScan.plan_files()andto_arrow_batch_reader()):The 146× plan-time win is on a 1 000-disjunct predicate against a
~10k-file table; the original cost scales linearly with table file
count and predicate disjunct count, the augmented cost scales with the
source's
created_atspan instead.The 86% byte reduction is dominated by skipping the
log(JSONpayload) column at the parquet reader — that one column carries ~80%
of the row width on this table.
For a representative larger flush — 1.16M source rows spanning ~24 h
— extrapolating both wins reduces the destination-scan working set
from multiple GiB (which is OOM-territory on 8 GiB worker pods) down
to tens of MiB.
Are there any user-facing changes?
No API change. The optimisation is purely internal to
Transaction.upsert:pyiceberg.table.upsert_utilfortestability but isn't part of the public API.
selected_fields=tuple(join_cols)is passed conditionally inside themethod — no signature change to
Table.upsertorTransaction.upsert.Why a range augmentation rather than reusing
_build_partition_predicate?Transaction._build_partition_predicateplus_determine_partitionstogether could express the same intent — apply each partition
transform to
df, take the distinct partition tuples, and emitOr(And(EqualTo, …), …). It would prune marginally harder forgappy sources (where
[min, max]over-fetches the gap). I pickedthe range approach over that combination for three reasons:
Predicate size. Exact partition match emits one disjunct per
distinct partition value present in
df. For a daily-partitionedtable with a multi-month source the
Orreaches hundreds of nodes— exactly the predicate-bloat shape that motivated Optimize upsert performance for large datasets #2943. The range
approach is 2 nodes per partition column regardless of cardinality
and downstream metrics-evaluator cost scales with that.
Reuse boundary.
_determine_partitionsis bound to the writepath: it filters +
combine_chunksper partition for the writerto consume. Reusing it for read-side planning either wastes that
work or requires lifting the partition-key extraction into a shared
helper — a separable, slightly larger refactor.
Idiomatic projection.
GreaterThanOrEqual(source_col, min)feeds
inclusive_projectionsource-side bounds and lets theexisting
transform.project(...)machinery rewrite them intopartition-column predicates at scan time. That's the same
projection path the rest of pyiceberg uses.
For temporally-dense sources (the dominant upsert pattern: a recent
batch of activity, no internal gaps) the two approaches prune the same
files. For gappy sources the exact-match approach prunes strictly
harder at the cost of a larger predicate. Happy to switch to the
_determine_partitions+_build_partition_predicatecombo ifreviewers prefer that direction — or to factor a thin
partition_records_from_arrow_tablehelper out of_determine_partitionsso both sides can share it.Related
@koenvo), Upserting large table extremely slow #2159(umbrella slow-upsert tracker), Upsert with 1M rows extremely slow due to
create_match_filterandtxn.delete()performance #3129 (recent:create_match_filterapproach — that PR shrinks the row predicate itself; this one adds
partition pruning the row predicate can't trigger on its own. Both
can coexist.
Was generative AI tooling used to co-author this PR?