Skip to content

[python][daft] Make Daft Paimon read source serializable#8029

Open
kerwin-zk wants to merge 3 commits into
apache:masterfrom
kerwin-zk:fix-daft-read-source-serializable
Open

[python][daft] Make Daft Paimon read source serializable#8029
kerwin-zk wants to merge 3 commits into
apache:masterfrom
kerwin-zk:fix-daft-read-source-serializable

Conversation

@kerwin-zk
Copy link
Copy Markdown
Contributor

Purpose

Make the Daft Paimon read source serializable when running with Ray.

Previously, PaimonDataSource and fallback read tasks could retain live
FileStoreTable, FileIO, StorageConfig, or TableRead objects. With
remote filesystems such as OSS/Jindo, Ray failed to serialize the execution
plan because those objects may contain non-picklable PyArrow filesystem state.

RuntimeError: Failed to serialize: OtherString("TypeError: no default __reduce__ due to non-trivial __cinit__")

Tests

CI

file_io = getattr(table, "file_io", None)
properties = getattr(file_io, "properties", None)
if properties is None:
properties = getattr(file_io, "catalog_options", None)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can all obtain properties information through the properties attribute, even for RESTTokenFileIO. However, we need to determine whether it is CachingFileIO (in which case it needs to be obtained from _delegate) to increase code robustness.
I further recommend unifying the abstract properties API for FileIO on the pypaimon side. Because we currently have too many getattr calls.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added a properties property on CachingFileIO that delegates to its _delegate, so every FileIO implementation now exposes .properties uniformly. _extract_catalog_options reads table.file_io.properties directly, with no per-implementation getattr.

if identifier is None:
return None

get_database_name = getattr(identifier, "get_database_name", None)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's fine to call identifier.get_database_name and identifier.get_table_name directly, not though getattr.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, calling identifier.get_database_name() / get_table_name() / get_branch_name() directly now. This also fixes a latent issue: the old getattr fallback used identifier.object, which is the encoded object name and would round-trip incorrectly for branch tables.



def _extract_table_options(table: FileStoreTable) -> dict[str, Any]:
table_schema = getattr(table, "table_schema", None)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's define schema method in FileStoreTable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added FileStoreTable.schema() returning the TableSchema, and _extract_table_options now uses table.schema().options.

)

if can_use_native_reader:
use_paimon_reader_task = (
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide detailed notes here regarding the scenarios in which the Daft Native Reader can be used, and those in which the Paimon Reader is necessary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added detailed comments above the reader-selection logic in get_tasks, describing when the Daft native Parquet reader is used and when the pypaimon reader task is required (non-Parquet, blob columns, LSM merge, or deletion vectors).

return not self._is_parquet or self._has_blob_columns or self._table.is_primary_key_table

def _requires_serializable_paimon_reader_task(self) -> bool:
if self._warehouse_scheme in ("", "file"):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ray environment, to scan a normal append-only paimon table location on aliyun oss, we expect this can run in daft native reader way. But when _warehouse_scheme of this case is oss, this method will return true, and use_paimon_reader_task will return true. This does not meet our expectations. Correct me If i'm wrong please.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, thanks. I removed the gate: a normal append-only Parquet table on OSS now goes through Daft's native reader under the Ray runner. This is safe because both the source and the pypaimon fallback task serialize only rebuildable metadata (catalog options, identifier, table path), and the native task carries Daft's own picklable StorageConfig. The pypaimon reader task is now used only for splits that genuinely need it (PK/LSM merge, non-Parquet, blob columns, deletion vectors). I verified end-to-end that community Daft reading an append-only Parquet table on OSS under the Ray runner uses the native reader and returns correct results.

@YannByron
Copy link
Copy Markdown
Contributor

@kerwin-zk Thank you for working on this. I left some comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants